source: trunk/module/tests/test_dbapi20.py @ 683

Last change on this file since 683 was 683, checked in by cito, 4 years ago

Return rows as named tuples in pgdb

By default, we now return result rows as named tuples in pgdb.

Note that named tuples can be accessed like normal lists and tuples,
and easily converted to these. They can also be easily converted to
(ordered) dictionaries by calling row._asdict(). Therefore the need
for alternative Cursor types with different row types has been greatly
reduced, so I have simplified the implementation in the last revision
by removing the added Cursor classes and cursor() methods again,
leaving only the old row_factory method for customizing the returned
row types. I complemented this with a new build_row_factory method,
because different named tuple classes must be created for different
result sets, so a static row_factory method is not so appropriate.

Tests and documentation for these changes are included.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 20.1 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 683 2016-01-01 23:11:01Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31try:
32    long
33except NameError:  # Python >= 3.0
34    long = int
35
36try:
37    from collections import OrderedDict
38except ImportError:  # Python 2.6 or 3.0
39    OrderedDict = None
40
41
42class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
43
44    driver = pgdb
45    connect_args = ()
46    connect_kw_args = {'database': dbname,
47        'host': '%s:%d' % (dbhost or '', dbport or -1)}
48
49    lower_func = 'lower'  # For stored procedure test
50
51    def setUp(self):
52        # Call superclass setUp in case this does something in the future
53        dbapi20.DatabaseAPI20Test.setUp(self)
54        try:
55            con = self._connect()
56            con.close()
57        except pgdb.Error:  # try to create a missing database
58            import pg
59            try:  # first try to log in as superuser
60                db = pg.DB('postgres', dbhost or None, dbport or -1,
61                    user='postgres')
62            except Exception:  # then try to log in as current user
63                db = pg.DB('postgres', dbhost or None, dbport or -1)
64            db.query('create database ' + dbname)
65
66    def tearDown(self):
67        dbapi20.DatabaseAPI20Test.tearDown(self)
68
69    def test_cursor_type(self):
70
71        class TestCursor(pgdb.Cursor):
72            pass
73
74        con = self._connect()
75        self.assertIs(con.cursor_type, pgdb.Cursor)
76        cur = con.cursor()
77        self.assertIsInstance(cur, pgdb.Cursor)
78        self.assertNotIsInstance(cur, TestCursor)
79        con.cursor_type = TestCursor
80        cur = con.cursor()
81        self.assertIsInstance(cur, TestCursor)
82        cur = con.cursor()
83        self.assertIsInstance(cur, TestCursor)
84        con = self._connect()
85        self.assertIs(con.cursor_type, pgdb.Cursor)
86        cur = con.cursor()
87        self.assertIsInstance(cur, pgdb.Cursor)
88        self.assertNotIsInstance(cur, TestCursor)
89
90    def test_row_factory(self):
91
92        class TestCursor(pgdb.Cursor):
93
94            def row_factory(self, row):
95                return dict(('column %s' % desc[0], value)
96                    for desc, value in zip(self.description, row))
97
98        con = self._connect()
99        con.cursor_type = TestCursor
100        cur = con.cursor()
101        self.assertIsInstance(cur, TestCursor)
102        res = cur.execute("select 1 as a, 2 as b")
103        self.assertIs(res, cur, 'execute() should return cursor')
104        res = cur.fetchone()
105        self.assertIsInstance(res, dict)
106        self.assertEqual(res, {'column a': 1, 'column b': 2})
107        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
108        res = cur.fetchall()
109        self.assertIsInstance(res, list)
110        self.assertEqual(len(res), 2)
111        self.assertIsInstance(res[0], dict)
112        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
113        self.assertIsInstance(res[1], dict)
114        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
115
116    def test_build_row_factory(self):
117
118        class TestCursor(pgdb.Cursor):
119
120            def build_row_factory(self):
121                keys = [desc[0] for desc in self.description]
122                return lambda row: dict((key, value)
123                    for key, value in zip(keys, row))
124
125        con = self._connect()
126        con.cursor_type = TestCursor
127        cur = con.cursor()
128        self.assertIsInstance(cur, TestCursor)
129        cur.execute("select 1 as a, 2 as b")
130        res = cur.fetchone()
131        self.assertIsInstance(res, dict)
132        self.assertEqual(res, {'a': 1, 'b': 2})
133        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
134        res = cur.fetchall()
135        self.assertIsInstance(res, list)
136        self.assertEqual(len(res), 2)
137        self.assertIsInstance(res[0], dict)
138        self.assertEqual(res[0], {'a': 1, 'b': 2})
139        self.assertIsInstance(res[1], dict)
140        self.assertEqual(res[1], {'a': 3, 'b': 4})
141
142    def test_cursor_with_named_columns(self):
143        con = self._connect()
144        cur = con.cursor()
145        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
146        self.assertIs(res, cur, 'execute() should return cursor')
147        res = cur.fetchone()
148        self.assertIsInstance(res, tuple)
149        self.assertEqual(res, (1, 2, 3))
150        self.assertEqual(res._fields, ('abc', 'de', 'f'))
151        self.assertEqual(res.abc, 1)
152        self.assertEqual(res.de, 2)
153        self.assertEqual(res.f, 3)
154        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
155        res = cur.fetchall()
156        self.assertIsInstance(res, list)
157        self.assertEqual(len(res), 2)
158        self.assertIsInstance(res[0], tuple)
159        self.assertEqual(res[0], (1, 2))
160        self.assertEqual(res[0]._fields, ('one', 'two'))
161        self.assertIsInstance(res[1], tuple)
162        self.assertEqual(res[1], (3, 4))
163        self.assertEqual(res[1]._fields, ('one', 'two'))
164
165    def test_cursor_with_unnamed_columns(self):
166        con = self._connect()
167        cur = con.cursor()
168        cur.execute("select 1, 2, 3")
169        res = cur.fetchone()
170        self.assertIsInstance(res, tuple)
171        self.assertEqual(res, (1, 2, 3))
172        old_py = OrderedDict is None  # Python 2.6 or 3.0
173        # old Python versions cannot rename tuple fields with underscore
174        if old_py:
175            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
176        else:
177            self.assertEqual(res._fields, ('_0', '_1', '_2'))
178        cur.execute("select 1 as one, 2, 3 as three")
179        res = cur.fetchone()
180        self.assertIsInstance(res, tuple)
181        self.assertEqual(res, (1, 2, 3))
182        if old_py:  # cannot auto rename with underscore
183            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
184        else:
185            self.assertEqual(res._fields, ('one', '_1', 'three'))
186        cur.execute("select 1 as abc, 2 as def")
187        res = cur.fetchone()
188        self.assertIsInstance(res, tuple)
189        self.assertEqual(res, (1, 2))
190        if old_py:
191            self.assertEqual(res._fields, ('column_0', 'column_1'))
192        else:
193            self.assertEqual(res._fields, ('abc', '_1'))
194
195    def test_colnames(self):
196        con = self._connect()
197        cur = con.cursor()
198        cur.execute("select 1, 2, 3")
199        names = cur.colnames
200        self.assertIsInstance(names, list)
201        self.assertEqual(names, ['?column?', '?column?', '?column?'])
202        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
203        names = cur.colnames
204        self.assertIsInstance(names, list)
205        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
206
207    def test_coltypes(self):
208        con = self._connect()
209        cur = con.cursor()
210        cur.execute("select 1::int2, 2::int4, 3::int8")
211        types = cur.coltypes
212        self.assertIsInstance(types, list)
213        self.assertEqual(types, ['int2', 'int4', 'int8'])
214
215    def test_description_fields(self):
216        con = self._connect()
217        cur = con.cursor()
218        cur.execute("select 123456789::int8 as col")
219        desc = cur.description
220        self.assertIsInstance(desc, list)
221        self.assertEqual(len(desc), 1)
222        desc = desc[0]
223        self.assertIsInstance(desc, tuple)
224        self.assertEqual(len(desc), 7)
225        self.assertEqual(desc.name, 'col')
226        self.assertEqual(desc.type_code, 'int8')
227        self.assertIsNone(desc.display_size)
228        self.assertIsInstance(desc.internal_size, int)
229        self.assertEqual(desc.internal_size, 8)
230        self.assertIsNone(desc.precision)
231        self.assertIsNone(desc.scale)
232        self.assertIsNone(desc.null_ok)
233
234    def test_cursor_iteration(self):
235        con = self._connect()
236        cur = con.cursor()
237        cur.execute("select 1 union select 2 union select 3")
238        self.assertEqual([r[0] for r in cur], [1, 2, 3])
239
240    def test_fetch_2_rows(self):
241        Decimal = pgdb.decimal_type()
242        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
243            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
244            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
245            7897234)
246        table = self.table_prefix + 'booze'
247        con = self._connect()
248        try:
249            cur = con.cursor()
250            cur.execute("create table %s ("
251                "stringtest varchar,"
252                "binarytest bytea,"
253                "booltest bool,"
254                "integertest int4,"
255                "longtest int8,"
256                "floattest float8,"
257                "numerictest numeric,"
258                "moneytest money,"
259                "datetest date,"
260                "timetest time,"
261                "datetimetest timestamp,"
262                "intervaltest interval,"
263                "rowidtest oid)" % table)
264            for s in ('numeric', 'monetary', 'time'):
265                cur.execute("set lc_%s to 'C'" % s)
266            for _i in range(2):
267                cur.execute("insert into %s values ("
268                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
269                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
270            cur.execute("select * from %s" % table)
271            rows = cur.fetchall()
272            self.assertEqual(len(rows), 2)
273            row0 = rows[0]
274            self.assertEqual(row0, values)
275            self.assertEqual(row0, rows[1])
276            self.assertIsInstance(row0[0], str)
277            self.assertIsInstance(row0[1], bytes)
278            self.assertIsInstance(row0[2], bool)
279            self.assertIsInstance(row0[3], int)
280            self.assertIsInstance(row0[4], long)
281            self.assertIsInstance(row0[5], float)
282            self.assertIsInstance(row0[6], Decimal)
283            self.assertIsInstance(row0[7], Decimal)
284            self.assertIsInstance(row0[8], str)
285            self.assertIsInstance(row0[9], str)
286            self.assertIsInstance(row0[10], str)
287            self.assertIsInstance(row0[11], str)
288        finally:
289            con.close()
290
291    def test_sqlstate(self):
292        con = self._connect()
293        cur = con.cursor()
294        try:
295            cur.execute("select 1/0")
296        except pgdb.DatabaseError as error:
297            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
298            # the SQLSTATE error code for division by zero is 22012
299            self.assertEqual(error.sqlstate, '22012')
300
301    def test_float(self):
302        nan, inf = float('nan'), float('inf')
303        from math import isnan, isinf
304        self.assertTrue(isnan(nan) and not isinf(nan))
305        self.assertTrue(isinf(inf) and not isnan(inf))
306        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
307        table = self.table_prefix + 'booze'
308        con = self._connect()
309        try:
310            cur = con.cursor()
311            cur.execute(
312                "create table %s (n smallint, floattest float)" % table)
313            params = enumerate(values)
314            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
315            cur.execute("select * from %s order by 1" % table)
316            rows = cur.fetchall()
317        finally:
318            con.close()
319        self.assertEqual(len(rows), len(values))
320        rows = [row[1] for row in rows]
321        for inval, outval in zip(values, rows):
322            if isinf(inval):
323                self.assertTrue(isinf(outval))
324                if inval < 0:
325                    self.assertTrue(outval < 0)
326                else:
327                    self.assertTrue(outval > 0)
328            elif isnan(inval):
329                self.assertTrue(isnan(outval))
330            else:
331                self.assertEqual(inval, outval)
332
333    def test_set_decimal_type(self):
334        decimal_type = pgdb.decimal_type()
335        self.assertTrue(decimal_type is not None and callable(decimal_type))
336        con = self._connect()
337        try:
338            cur = con.cursor()
339            self.assertTrue(pgdb.decimal_type(int) is int)
340            cur.execute('select 42')
341            value = cur.fetchone()[0]
342            self.assertTrue(isinstance(value, int))
343            self.assertEqual(value, 42)
344            self.assertTrue(pgdb.decimal_type(float) is float)
345            cur.execute('select 4.25')
346            value = cur.fetchone()[0]
347            self.assertTrue(isinstance(value, float))
348            self.assertEqual(value, 4.25)
349        finally:
350            con.close()
351            pgdb.decimal_type(decimal_type)
352        self.assertTrue(pgdb.decimal_type() is decimal_type)
353
354    def test_unicode_with_utf8(self):
355        table = self.table_prefix + 'booze'
356        input = u"He wes Leovenaðes sone — liðe him be Drihten"
357        con = self._connect()
358        try:
359            cur = con.cursor()
360            cur.execute("create table %s (t text)" % table)
361            try:
362                cur.execute("set client_encoding=utf8")
363                cur.execute(u"select '%s'" % input)
364            except Exception:
365                self.skipTest("database does not support utf8")
366            output1 = cur.fetchone()[0]
367            cur.execute("insert into %s values (%%s)" % table, (input,))
368            cur.execute("select * from %s" % table)
369            output2 = cur.fetchone()[0]
370            cur.execute("select t = '%s' from %s" % (input, table))
371            output3 = cur.fetchone()[0]
372            cur.execute("select t = %%s from %s" % table, (input,))
373            output4 = cur.fetchone()[0]
374        finally:
375            con.close()
376        if str is bytes:  # Python < 3.0
377            input = input.encode('utf8')
378        self.assertIsInstance(output1, str)
379        self.assertEqual(output1, input)
380        self.assertIsInstance(output2, str)
381        self.assertEqual(output2, input)
382        self.assertIsInstance(output3, bool)
383        self.assertTrue(output3)
384        self.assertIsInstance(output4, bool)
385        self.assertTrue(output4)
386
387    def test_unicode_with_latin1(self):
388        table = self.table_prefix + 'booze'
389        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
390        con = self._connect()
391        try:
392            cur = con.cursor()
393            cur.execute("create table %s (t text)" % table)
394            try:
395                cur.execute("set client_encoding=latin1")
396                cur.execute(u"select '%s'" % input)
397            except Exception:
398                self.skipTest("database does not support latin1")
399            output1 = cur.fetchone()[0]
400            cur.execute("insert into %s values (%%s)" % table, (input,))
401            cur.execute("select * from %s" % table)
402            output2 = cur.fetchone()[0]
403            cur.execute("select t = '%s' from %s" % (input, table))
404            output3 = cur.fetchone()[0]
405            cur.execute("select t = %%s from %s" % table, (input,))
406            output4 = cur.fetchone()[0]
407        finally:
408            con.close()
409        if str is bytes:  # Python < 3.0
410            input = input.encode('latin1')
411        self.assertIsInstance(output1, str)
412        self.assertEqual(output1, input)
413        self.assertIsInstance(output2, str)
414        self.assertEqual(output2, input)
415        self.assertIsInstance(output3, bool)
416        self.assertTrue(output3)
417        self.assertIsInstance(output4, bool)
418        self.assertTrue(output4)
419
420    def test_bool(self):
421        values = [False, True, None, 't', 'f', 'true', 'false']
422        table = self.table_prefix + 'booze'
423        con = self._connect()
424        try:
425            cur = con.cursor()
426            cur.execute(
427                "create table %s (n smallint, booltest bool)" % table)
428            params = enumerate(values)
429            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
430            cur.execute("select * from %s order by 1" % table)
431            rows = cur.fetchall()
432        finally:
433            con.close()
434        rows = [row[1] for row in rows]
435        values[3] = values[5] = True
436        values[4] = values[6] = False
437        self.assertEqual(rows, values)
438
439    def test_nextset(self):
440        con = self._connect()
441        cur = con.cursor()
442        self.assertRaises(con.NotSupportedError, cur.nextset)
443
444    def test_setoutputsize(self):
445        pass  # not supported
446
447    def test_connection_errors(self):
448        con = self._connect()
449        self.assertEqual(con.Error, pgdb.Error)
450        self.assertEqual(con.Warning, pgdb.Warning)
451        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
452        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
453        self.assertEqual(con.InternalError, pgdb.InternalError)
454        self.assertEqual(con.OperationalError, pgdb.OperationalError)
455        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
456        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
457        self.assertEqual(con.DataError, pgdb.DataError)
458        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
459
460    def test_connection_as_contextmanager(self):
461        table = self.table_prefix + 'booze'
462        con = self._connect()
463        try:
464            cur = con.cursor()
465            cur.execute("create table %s (n smallint check(n!=4))" % table)
466            with con:
467                cur.execute("insert into %s values (1)" % table)
468                cur.execute("insert into %s values (2)" % table)
469            try:
470                with con:
471                    cur.execute("insert into %s values (3)" % table)
472                    cur.execute("insert into %s values (4)" % table)
473            except con.ProgrammingError as error:
474                self.assertTrue('check' in str(error).lower())
475            with con:
476                cur.execute("insert into %s values (5)" % table)
477                cur.execute("insert into %s values (6)" % table)
478            try:
479                with con:
480                    cur.execute("insert into %s values (7)" % table)
481                    cur.execute("insert into %s values (8)" % table)
482                    raise ValueError('transaction should rollback')
483            except ValueError as error:
484                self.assertEqual(str(error), 'transaction should rollback')
485            with con:
486                cur.execute("insert into %s values (9)" % table)
487            cur.execute("select * from %s order by 1" % table)
488            rows = cur.fetchall()
489            rows = [row[0] for row in rows]
490        finally:
491            con.close()
492        self.assertEqual(rows, [1, 2, 5, 6, 9])
493
494    def test_cursor_connection(self):
495        con = self._connect()
496        cur = con.cursor()
497        self.assertEqual(cur.connection, con)
498        cur.close()
499
500    def test_cursor_as_contextmanager(self):
501        con = self._connect()
502        with con.cursor() as cur:
503            self.assertEqual(cur.connection, con)
504
505    def test_pgdb_type(self):
506        self.assertEqual(pgdb.STRING, pgdb.STRING)
507        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
508        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
509        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
510        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
511        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
512        self.assertEqual('char', pgdb.STRING)
513        self.assertEqual('varchar', pgdb.STRING)
514        self.assertEqual('text', pgdb.STRING)
515        self.assertNotEqual('numeric', pgdb.STRING)
516        self.assertEqual('numeric', pgdb.NUMERIC)
517        self.assertEqual('numeric', pgdb.NUMBER)
518        self.assertEqual('int4', pgdb.NUMBER)
519        self.assertNotEqual('int4', pgdb.NUMERIC)
520        self.assertEqual('int2', pgdb.SMALLINT)
521        self.assertNotEqual('int4', pgdb.SMALLINT)
522        self.assertEqual('int2', pgdb.INTEGER)
523        self.assertEqual('int4', pgdb.INTEGER)
524        self.assertEqual('int8', pgdb.INTEGER)
525        self.assertNotEqual('int4', pgdb.LONG)
526        self.assertEqual('int8', pgdb.LONG)
527        self.assertTrue('char' in pgdb.STRING)
528        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
529        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
530        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
531        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
532
533
534if __name__ == '__main__':
535    unittest.main()
Note: See TracBrowser for help on using the repository browser.