source: trunk/module/tests/test_dbapi20.py @ 681

Last change on this file since 681 was 681, checked in by cito, 4 years ago

Let cursor.description return named tuples

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 14.8 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 681 2015-12-30 22:39:53Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31try:
32    long
33except NameError:  # Python >= 3.0
34    long = int
35
36
37class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
38
39    driver = pgdb
40    connect_args = ()
41    connect_kw_args = {'database': dbname,
42        'host': '%s:%d' % (dbhost or '', dbport or -1)}
43
44    lower_func = 'lower'  # For stored procedure test
45
46    def setUp(self):
47        # Call superclass setUp in case this does something in the future
48        dbapi20.DatabaseAPI20Test.setUp(self)
49        try:
50            con = self._connect()
51            con.close()
52        except pgdb.Error:  # try to create a missing database
53            import pg
54            try:  # first try to log in as superuser
55                db = pg.DB('postgres', dbhost or None, dbport or -1,
56                    user='postgres')
57            except Exception:  # then try to log in as current user
58                db = pg.DB('postgres', dbhost or None, dbport or -1)
59            db.query('create database ' + dbname)
60
61    def tearDown(self):
62        dbapi20.DatabaseAPI20Test.tearDown(self)
63
64    def test_row_factory(self):
65
66        class DictCursor(pgdb.Cursor):
67
68            def row_factory(self, row):
69                return {desc[0]:value
70                    for desc, value in zip(self.description, row)}
71
72        con = self._connect()
73        cur = DictCursor(con)
74        ret = cur.execute("select 1 as a, 2 as b")
75        self.assertTrue(ret is cur, 'execute() should return cursor')
76        self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
77
78    def test_description_named(self):
79        con = self._connect()
80        cur = con.cursor()
81        cur.execute("select 123456789::int8 as col")
82        desc = cur.description
83        self.assertIsInstance(desc, list)
84        self.assertEqual(len(desc), 1)
85        desc = desc[0]
86        self.assertIsInstance(desc, tuple)
87        self.assertEqual(desc.name, 'col')
88        self.assertEqual(desc.type_code, 'int8')
89        self.assertIsNone(desc.display_size)
90        self.assertIsInstance(desc.internal_size, int)
91        self.assertEqual(desc.internal_size, 8)
92        self.assertIsNone(desc.precision)
93        self.assertIsNone(desc.scale)
94        self.assertIsNone(desc.null_ok)
95
96    def test_cursor_iteration(self):
97        con = self._connect()
98        cur = con.cursor()
99        cur.execute("select 1 union select 2 union select 3")
100        self.assertEqual([r[0] for r in cur], [1, 2, 3])
101
102    def test_fetch_2_rows(self):
103        Decimal = pgdb.decimal_type()
104        values = ['test', pgdb.Binary(b'\xff\x52\xb2'),
105            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
106            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
107            7897234]
108        table = self.table_prefix + 'booze'
109        con = self._connect()
110        try:
111            cur = con.cursor()
112            cur.execute("create table %s ("
113                "stringtest varchar,"
114                "binarytest bytea,"
115                "booltest bool,"
116                "integertest int4,"
117                "longtest int8,"
118                "floattest float8,"
119                "numerictest numeric,"
120                "moneytest money,"
121                "datetest date,"
122                "timetest time,"
123                "datetimetest timestamp,"
124                "intervaltest interval,"
125                "rowidtest oid)" % table)
126            for s in ('numeric', 'monetary', 'time'):
127                cur.execute("set lc_%s to 'C'" % s)
128            for _i in range(2):
129                cur.execute("insert into %s values ("
130                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
131                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
132            cur.execute("select * from %s" % table)
133            rows = cur.fetchall()
134            self.assertEqual(len(rows), 2)
135            row0 = rows[0]
136            self.assertEqual(row0, values)
137            self.assertEqual(row0, rows[1])
138            self.assertIsInstance(row0[0], str)
139            self.assertIsInstance(row0[1], bytes)
140            self.assertIsInstance(row0[2], bool)
141            self.assertIsInstance(row0[3], int)
142            self.assertIsInstance(row0[4], long)
143            self.assertIsInstance(row0[5], float)
144            self.assertIsInstance(row0[6], Decimal)
145            self.assertIsInstance(row0[7], Decimal)
146            self.assertIsInstance(row0[8], str)
147            self.assertIsInstance(row0[9], str)
148            self.assertIsInstance(row0[10], str)
149            self.assertIsInstance(row0[11], str)
150        finally:
151            con.close()
152
153    def test_sqlstate(self):
154        con = self._connect()
155        cur = con.cursor()
156        try:
157            cur.execute("select 1/0")
158        except pgdb.DatabaseError as error:
159            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
160            # the SQLSTATE error code for division by zero is 22012
161            self.assertEqual(error.sqlstate, '22012')
162
163    def test_float(self):
164        nan, inf = float('nan'), float('inf')
165        from math import isnan, isinf
166        self.assertTrue(isnan(nan) and not isinf(nan))
167        self.assertTrue(isinf(inf) and not isnan(inf))
168        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
169        table = self.table_prefix + 'booze'
170        con = self._connect()
171        try:
172            cur = con.cursor()
173            cur.execute(
174                "create table %s (n smallint, floattest float)" % table)
175            params = enumerate(values)
176            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
177            cur.execute("select * from %s order by 1" % table)
178            rows = cur.fetchall()
179        finally:
180            con.close()
181        self.assertEqual(len(rows), len(values))
182        rows = [row[1] for row in rows]
183        for inval, outval in zip(values, rows):
184            if isinf(inval):
185                self.assertTrue(isinf(outval))
186                if inval < 0:
187                    self.assertTrue(outval < 0)
188                else:
189                    self.assertTrue(outval > 0)
190            elif isnan(inval):
191                self.assertTrue(isnan(outval))
192            else:
193                self.assertEqual(inval, outval)
194
195    def test_set_decimal_type(self):
196        decimal_type = pgdb.decimal_type()
197        self.assertTrue(decimal_type is not None and callable(decimal_type))
198        con = self._connect()
199        try:
200            cur = con.cursor()
201            self.assertTrue(pgdb.decimal_type(int) is int)
202            cur.execute('select 42')
203            value = cur.fetchone()[0]
204            self.assertTrue(isinstance(value, int))
205            self.assertEqual(value, 42)
206            self.assertTrue(pgdb.decimal_type(float) is float)
207            cur.execute('select 4.25')
208            value = cur.fetchone()[0]
209            self.assertTrue(isinstance(value, float))
210            self.assertEqual(value, 4.25)
211        finally:
212            con.close()
213            pgdb.decimal_type(decimal_type)
214        self.assertTrue(pgdb.decimal_type() is decimal_type)
215
216    def test_unicode_with_utf8(self):
217        table = self.table_prefix + 'booze'
218        input = u"He wes Leovenaðes sone — liðe him be Drihten"
219        con = self._connect()
220        try:
221            cur = con.cursor()
222            cur.execute("create table %s (t text)" % table)
223            try:
224                cur.execute("set client_encoding=utf8")
225                cur.execute(u"select '%s'" % input)
226            except Exception:
227                self.skipTest("database does not support utf8")
228            output1 = cur.fetchone()[0]
229            cur.execute("insert into %s values (%%s)" % table, (input,))
230            cur.execute("select * from %s" % table)
231            output2 = cur.fetchone()[0]
232            cur.execute("select t = '%s' from %s" % (input, table))
233            output3 = cur.fetchone()[0]
234            cur.execute("select t = %%s from %s" % table, (input,))
235            output4 = cur.fetchone()[0]
236        finally:
237            con.close()
238        if str is bytes:  # Python < 3.0
239            input = input.encode('utf8')
240        self.assertIsInstance(output1, str)
241        self.assertEqual(output1, input)
242        self.assertIsInstance(output2, str)
243        self.assertEqual(output2, input)
244        self.assertIsInstance(output3, bool)
245        self.assertTrue(output3)
246        self.assertIsInstance(output4, bool)
247        self.assertTrue(output4)
248
249    def test_unicode_with_latin1(self):
250        table = self.table_prefix + 'booze'
251        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
252        con = self._connect()
253        try:
254            cur = con.cursor()
255            cur.execute("create table %s (t text)" % table)
256            try:
257                cur.execute("set client_encoding=latin1")
258                cur.execute(u"select '%s'" % input)
259            except Exception:
260                self.skipTest("database does not support latin1")
261            output1 = cur.fetchone()[0]
262            cur.execute("insert into %s values (%%s)" % table, (input,))
263            cur.execute("select * from %s" % table)
264            output2 = cur.fetchone()[0]
265            cur.execute("select t = '%s' from %s" % (input, table))
266            output3 = cur.fetchone()[0]
267            cur.execute("select t = %%s from %s" % table, (input,))
268            output4 = cur.fetchone()[0]
269        finally:
270            con.close()
271        if str is bytes:  # Python < 3.0
272            input = input.encode('latin1')
273        self.assertIsInstance(output1, str)
274        self.assertEqual(output1, input)
275        self.assertIsInstance(output2, str)
276        self.assertEqual(output2, input)
277        self.assertIsInstance(output3, bool)
278        self.assertTrue(output3)
279        self.assertIsInstance(output4, bool)
280        self.assertTrue(output4)
281
282    def test_bool(self):
283        values = [False, True, None, 't', 'f', 'true', 'false']
284        table = self.table_prefix + 'booze'
285        con = self._connect()
286        try:
287            cur = con.cursor()
288            cur.execute(
289                "create table %s (n smallint, booltest bool)" % table)
290            params = enumerate(values)
291            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
292            cur.execute("select * from %s order by 1" % table)
293            rows = cur.fetchall()
294        finally:
295            con.close()
296        rows = [row[1] for row in rows]
297        values[3] = values[5] = True
298        values[4] = values[6] = False
299        self.assertEqual(rows, values)
300
301    def test_nextset(self):
302        con = self._connect()
303        cur = con.cursor()
304        self.assertRaises(con.NotSupportedError, cur.nextset)
305
306    def test_setoutputsize(self):
307        pass  # not supported
308
309    def test_connection_errors(self):
310        con = self._connect()
311        self.assertEqual(con.Error, pgdb.Error)
312        self.assertEqual(con.Warning, pgdb.Warning)
313        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
314        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
315        self.assertEqual(con.InternalError, pgdb.InternalError)
316        self.assertEqual(con.OperationalError, pgdb.OperationalError)
317        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
318        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
319        self.assertEqual(con.DataError, pgdb.DataError)
320        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
321
322    def test_connection_as_contextmanager(self):
323        table = self.table_prefix + 'booze'
324        con = self._connect()
325        try:
326            cur = con.cursor()
327            cur.execute("create table %s (n smallint check(n!=4))" % table)
328            with con:
329                cur.execute("insert into %s values (1)" % table)
330                cur.execute("insert into %s values (2)" % table)
331            try:
332                with con:
333                    cur.execute("insert into %s values (3)" % table)
334                    cur.execute("insert into %s values (4)" % table)
335            except con.ProgrammingError as error:
336                self.assertTrue('check' in str(error).lower())
337            with con:
338                cur.execute("insert into %s values (5)" % table)
339                cur.execute("insert into %s values (6)" % table)
340            try:
341                with con:
342                    cur.execute("insert into %s values (7)" % table)
343                    cur.execute("insert into %s values (8)" % table)
344                    raise ValueError('transaction should rollback')
345            except ValueError as error:
346                self.assertEqual(str(error), 'transaction should rollback')
347            with con:
348                cur.execute("insert into %s values (9)" % table)
349            cur.execute("select * from %s order by 1" % table)
350            rows = cur.fetchall()
351            rows = [row[0] for row in rows]
352        finally:
353            con.close()
354        self.assertEqual(rows, [1, 2, 5, 6, 9])
355
356    def test_cursor_connection(self):
357        con = self._connect()
358        cur = con.cursor()
359        self.assertEqual(cur.connection, con)
360        cur.close()
361
362    def test_cursor_as_contextmanager(self):
363        con = self._connect()
364        with con.cursor() as cur:
365            self.assertEqual(cur.connection, con)
366
367    def test_pgdb_type(self):
368        self.assertEqual(pgdb.STRING, pgdb.STRING)
369        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
370        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
371        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
372        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
373        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
374        self.assertEqual('char', pgdb.STRING)
375        self.assertEqual('varchar', pgdb.STRING)
376        self.assertEqual('text', pgdb.STRING)
377        self.assertNotEqual('numeric', pgdb.STRING)
378        self.assertEqual('numeric', pgdb.NUMERIC)
379        self.assertEqual('numeric', pgdb.NUMBER)
380        self.assertEqual('int4', pgdb.NUMBER)
381        self.assertNotEqual('int4', pgdb.NUMERIC)
382        self.assertEqual('int2', pgdb.SMALLINT)
383        self.assertNotEqual('int4', pgdb.SMALLINT)
384        self.assertEqual('int2', pgdb.INTEGER)
385        self.assertEqual('int4', pgdb.INTEGER)
386        self.assertEqual('int8', pgdb.INTEGER)
387        self.assertNotEqual('int4', pgdb.LONG)
388        self.assertEqual('int8', pgdb.LONG)
389        self.assertTrue('char' in pgdb.STRING)
390        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
391        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
392        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
393        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
394
395
396if __name__ == '__main__':
397    unittest.main()
Note: See TracBrowser for help on using the repository browser.