source: branches/4.x/module/tests/test_dbapi20.py @ 666

Last change on this file since 666 was 666, checked in by darcy, 4 years ago

When creating a missing database use the same parameters as the test run.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 11.3 KB
Line 
1#! /usr/bin/python
2# $Id: test_dbapi20.py 666 2015-12-23 15:14:10Z darcy $
3
4try:
5    import unittest2 as unittest  # for Python < 2.7
6except ImportError:
7    import unittest
8
9import sys
10
11import pgdb
12
13import dbapi20
14
15# check whether the "with" statement is supported
16no_with = sys.version_info[:2] < (2, 5)
17
18# We need a database to test against.
19# If LOCAL_PyGreSQL.py exists we will get our information from that.
20# Otherwise we use the defaults.
21dbname = 'dbapi20_test'
22dbhost = ''
23dbport = 5432
24try:
25    from LOCAL_PyGreSQL import *
26except ImportError:
27    pass
28
29
30class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
31
32    driver = pgdb
33    connect_args = ()
34    connect_kw_args = {'database': dbname,
35        'host': '%s:%d' % (dbhost or '', dbport or -1)}
36
37    lower_func = 'lower'  # For stored procedure test
38
39    def setUp(self):
40        # Call superclass setUp in case this does something in the future
41        dbapi20.DatabaseAPI20Test.setUp(self)
42        try:
43            con = self._connect()
44            con.close()
45        except pgdb.Error:
46            import pg
47            pg.DB('postgres',dbhost,dbport).query('create database ' + dbname)
48
49    def tearDown(self):
50        dbapi20.DatabaseAPI20Test.tearDown(self)
51
52    def test_row_factory(self):
53        class myCursor(pgdb.pgdbCursor):
54            def row_factory(self, row):
55                d = {}
56                for idx, col in enumerate(self.description):
57                    d[col[0]] = row[idx]
58                return d
59
60        con = self._connect()
61        cur = myCursor(con)
62        ret = cur.execute("select 1 as a, 2 as b")
63        self.assert_(ret is cur, 'execute() should return cursor')
64        self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
65
66    def test_cursor_iteration(self):
67        con = self._connect()
68        cur = con.cursor()
69        cur.execute("select 1 union select 2 union select 3")
70        self.assertEqual([r[0] for r in cur], [1, 2, 3])
71
72    def test_fetch_2_rows(self):
73        Decimal = pgdb.decimal_type()
74        values = ['test', pgdb.Binary('\xff\x52\xb2'),
75            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
76            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
77            7897234]
78        table = self.table_prefix + 'booze'
79        con = self._connect()
80        try:
81            cur = con.cursor()
82            cur.execute("create table %s ("
83                "stringtest varchar,"
84                "binarytest bytea,"
85                "booltest bool,"
86                "integertest int4,"
87                "longtest int8,"
88                "floattest float8,"
89                "numerictest numeric,"
90                "moneytest money,"
91                "datetest date,"
92                "timetest time,"
93                "datetimetest timestamp,"
94                "intervaltest interval,"
95                "rowidtest oid)" % table)
96            for s in ('numeric', 'monetary', 'time'):
97                cur.execute("set lc_%s to 'C'" % s)
98            for _i in range(2):
99                cur.execute("insert into %s values ("
100                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
101                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
102            cur.execute("select * from %s" % table)
103            rows = cur.fetchall()
104            self.assertEqual(len(rows), 2)
105            self.assertEqual(rows[0], values)
106            self.assertEqual(rows[0], rows[1])
107        finally:
108            con.close()
109
110    def test_sqlstate(self):
111        con = self._connect()
112        cur = con.cursor()
113        try:
114            cur.execute("select 1/0")
115        except pgdb.DatabaseError, error:
116            self.assert_(isinstance(error, pgdb.ProgrammingError))
117            # the SQLSTATE error code for division by zero is 22012
118            self.assertEqual(error.sqlstate, '22012')
119
120    def test_float(self):
121        try:
122            nan = float('nan')
123        except ValueError:  # Python < 2.6
124            nan = 3.0e999 - 1.5e999999
125        try:
126            inf = float('inf')
127        except ValueError:  # Python < 2.6
128            inf = 3.0e999 * 1.5e999999
129        try:
130            from math import isnan, isinf
131        except ImportError:  # Python < 2.6
132            isnan = lambda x: x != x
133            isinf = lambda x: not isnan(x) and isnan(x * 0)
134        try:
135            from math import isnan, isinf
136        except ImportError:  # Python < 2.6
137            isnan = lambda x: x != x
138            isinf = lambda x: not isnan(x) and isnan(x * 0)
139        self.assert_(isnan(nan) and not isinf(nan))
140        self.assert_(isinf(inf) and not isnan(inf))
141        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
142        table = self.table_prefix + 'booze'
143        con = self._connect()
144        try:
145            cur = con.cursor()
146            cur.execute(
147                "create table %s (n smallint, floattest float)" % table)
148            params = enumerate(values)
149            cur.executemany("insert into %s values(%%s,%%s)" % table, params)
150            cur.execute("select * from %s order by 1" % table)
151            rows = cur.fetchall()
152        finally:
153            con.close()
154        self.assertEqual(len(rows), len(values))
155        rows = [row[1] for row in rows]
156        for inval, outval in zip(values, rows):
157            if isinf(inval):
158                self.assert_(isinf(outval))
159                if inval < 0:
160                    self.assert_(outval < 0)
161                else:
162                    self.assert_(outval > 0)
163            elif isnan(inval):
164                self.assert_(isnan(outval))
165            else:
166                self.assertEqual(inval, outval)
167
168    def test_bool(self):
169        values = [False, True, None, 't', 'f', 'true', 'false']
170        table = self.table_prefix + 'booze'
171        con = self._connect()
172        try:
173            cur = con.cursor()
174            cur.execute(
175                "create table %s (n smallint, booltest bool)" % table)
176            params = enumerate(values)
177            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
178            cur.execute("select * from %s order by 1" % table)
179            rows = cur.fetchall()
180        finally:
181            con.close()
182        rows = [row[1] for row in rows]
183        values[3] = values[5] = True
184        values[4] = values[6] = False
185        self.assertEqual(rows, values)
186
187    def test_set_decimal_type(self):
188        decimal_type = pgdb.decimal_type()
189        self.assert_(decimal_type is not None and callable(decimal_type))
190        con = self._connect()
191        try:
192            cur = con.cursor()
193            self.assert_(pgdb.decimal_type(int) is int)
194            cur.execute('select 42')
195            value = cur.fetchone()[0]
196            self.assert_(isinstance(value, int))
197            self.assertEqual(value, 42)
198            self.assert_(pgdb.decimal_type(float) is float)
199            cur.execute('select 4.25')
200            value = cur.fetchone()[0]
201            self.assert_(isinstance(value, float))
202            self.assertEqual(value, 4.25)
203        finally:
204            con.close()
205            pgdb.decimal_type(decimal_type)
206        self.assert_(pgdb.decimal_type() is decimal_type)
207
208    def test_nextset(self):
209        con = self._connect()
210        cur = con.cursor()
211        self.assertRaises(con.NotSupportedError, cur.nextset)
212
213    def test_setoutputsize(self):
214        pass  # not supported
215
216    def test_connection_errors(self):
217        con = self._connect()
218        self.assertEqual(con.Error, pgdb.Error)
219        self.assertEqual(con.Warning, pgdb.Warning)
220        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
221        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
222        self.assertEqual(con.InternalError, pgdb.InternalError)
223        self.assertEqual(con.OperationalError, pgdb.OperationalError)
224        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
225        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
226        self.assertEqual(con.DataError, pgdb.DataError)
227        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
228
229    @unittest.skipIf(no_with, 'context managers not supported')
230    def test_connection_as_contextmanager(self):
231        table = self.table_prefix + 'booze'
232        con = self._connect()
233        # wrap "with" statements to avoid SyntaxError in Python < 2.5
234        exec """from __future__ import with_statement\nif True:
235        try:
236            cur = con.cursor()
237            cur.execute("create table %s (n smallint check(n!=4))" % table)
238            with con:
239                cur.execute("insert into %s values (1)" % table)
240                cur.execute("insert into %s values (2)" % table)
241            try:
242                with con:
243                    cur.execute("insert into %s values (3)" % table)
244                    cur.execute("insert into %s values (4)" % table)
245            except con.ProgrammingError, error:
246                self.assertTrue('check' in str(error).lower())
247            with con:
248                cur.execute("insert into %s values (5)" % table)
249                cur.execute("insert into %s values (6)" % table)
250            try:
251                with con:
252                    cur.execute("insert into %s values (7)" % table)
253                    cur.execute("insert into %s values (8)" % table)
254                    raise ValueError('transaction should rollback')
255            except ValueError, error:
256                self.assertEqual(str(error), 'transaction should rollback')
257            with con:
258                cur.execute("insert into %s values (9)" % table)
259            cur.execute("select * from %s order by 1" % table)
260            rows = cur.fetchall()
261            rows = [row[0] for row in rows]
262        finally:
263            con.close()\n"""
264        self.assertEqual(rows, [1, 2, 5, 6, 9])
265
266    def test_cursor_connection(self):
267        con = self._connect()
268        cur = con.cursor()
269        self.assertEqual(cur.connection, con)
270        cur.close()
271
272    @unittest.skipIf(no_with, 'context managers not supported')
273    def test_cursor_as_contextmanager(self):
274        con = self._connect()
275        # wrap "with" statements to avoid SyntaxError in Python < 2.5
276        exec """from __future__ import with_statement\nif True:
277        with con.cursor() as cur:
278            self.assertEqual(cur.connection, con)\n"""
279
280    def test_pgdb_type(self):
281        self.assertEqual(pgdb.STRING, pgdb.STRING)
282        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
283        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
284        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
285        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
286        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
287        self.assertEqual('char', pgdb.STRING)
288        self.assertEqual('varchar', pgdb.STRING)
289        self.assertEqual('text', pgdb.STRING)
290        self.assertNotEqual('numeric', pgdb.STRING)
291        self.assertEqual('numeric', pgdb.NUMERIC)
292        self.assertEqual('numeric', pgdb.NUMBER)
293        self.assertEqual('int4', pgdb.NUMBER)
294        self.assertNotEqual('int4', pgdb.NUMERIC)
295        self.assertEqual('int2', pgdb.SMALLINT)
296        self.assertNotEqual('int4', pgdb.SMALLINT)
297        self.assertEqual('int2', pgdb.INTEGER)
298        self.assertEqual('int4', pgdb.INTEGER)
299        self.assertEqual('int8', pgdb.INTEGER)
300        self.assertNotEqual('int4', pgdb.LONG)
301        self.assertEqual('int8', pgdb.LONG)
302        self.assert_('char' in pgdb.STRING)
303        self.assert_(pgdb.NUMERIC <= pgdb.NUMBER)
304        self.assert_(pgdb.NUMBER >= pgdb.INTEGER)
305        self.assert_(pgdb.TIME <= pgdb.DATETIME)
306        self.assert_(pgdb.DATETIME >= pgdb.DATE)
307
308
309if __name__ == '__main__':
310    unittest.main()
Note: See TracBrowser for help on using the repository browser.