source: trunk/module/tests/test_dbapi20.py @ 682

Last change on this file since 682 was 682, checked in by cito, 4 years ago

Add cursor classes for returning various row types

Currently, PyGreSQL returns rows as lists in the DB-API 2 module.
It is more common to return tuples (and consistent with how it
is done int the classic module). Even better are named tuples
which are cheap and available in all Py versions PyGreSQL supports.

So I made tuples the default now and added additional Cursor types
for returning rows as lists, named tuples, dicts and ordered dicts.
Also added an attribute for setting the default Cursor type in the
connection and special methods for creating the various cursor types
from the same connection.

However, I think this should be simplified and named tuples should
become the default. Named tuples can be easily converted to ordinary
lists and tuples (although this should never be necessary), and they
can also be easily converted to ordered dicts by calling ._asdict().
So I'm planning to change this before long, but for reference I have
checked in this implementation with all necessary tests anyway.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 20.2 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 682 2016-01-01 17:32:24Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31try:
32    long
33except NameError:  # Python >= 3.0
34    long = int
35
36try:
37    from collections import OrderedDict
38except ImportError:  # Python 2.6 or 3.0
39    OrderedDict = None
40
41
42class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
43
44    driver = pgdb
45    connect_args = ()
46    connect_kw_args = {'database': dbname,
47        'host': '%s:%d' % (dbhost or '', dbport or -1)}
48
49    lower_func = 'lower'  # For stored procedure test
50
51    def setUp(self):
52        # Call superclass setUp in case this does something in the future
53        dbapi20.DatabaseAPI20Test.setUp(self)
54        try:
55            con = self._connect()
56            con.close()
57        except pgdb.Error:  # try to create a missing database
58            import pg
59            try:  # first try to log in as superuser
60                db = pg.DB('postgres', dbhost or None, dbport or -1,
61                    user='postgres')
62            except Exception:  # then try to log in as current user
63                db = pg.DB('postgres', dbhost or None, dbport or -1)
64            db.query('create database ' + dbname)
65
66    def tearDown(self):
67        dbapi20.DatabaseAPI20Test.tearDown(self)
68
69    def test_cursortype(self):
70        con = self._connect()
71        self.assertIs(con.cursor_type, pgdb.Cursor)
72        cur = con.cursor()
73        self.assertIsInstance(cur, pgdb.Cursor)
74        self.assertNotIsInstance(cur, pgdb.ListCursor)
75        cur.close()
76        con.cursor_type = pgdb.ListCursor
77        cur = con.cursor()
78        self.assertIsInstance(cur, pgdb.Cursor)
79        self.assertIsInstance(cur, pgdb.ListCursor)
80        cur.close()
81        cur = con.cursor()
82        self.assertIsInstance(cur, pgdb.ListCursor)
83        cur.close()
84        con.close()
85        con = self._connect()
86        self.assertIs(con.cursor_type, pgdb.Cursor)
87        cur = con.cursor()
88        self.assertIsInstance(cur, pgdb.Cursor)
89        self.assertNotIsInstance(cur, pgdb.ListCursor)
90        cur.close()
91        con.close()
92
93    def test_list_cursor(self):
94        con = self._connect()
95        cur = con.list_cursor()
96        self.assertIsInstance(cur, pgdb.ListCursor)
97        cur.execute("select 1, 2, 3")
98        res = cur.fetchone()
99        cur.close()
100        con.close()
101        self.assertIsInstance(res, list)
102        self.assertEqual(res, [1, 2, 3])
103
104    def test_tuple_cursor(self):
105        con = self._connect()
106        cur = con.tuple_cursor()
107        self.assertIsInstance(cur, pgdb.Cursor)
108        cur.execute("select 1, 2, 3")
109        res = cur.fetchone()
110        cur.close()
111        con.close()
112        self.assertIsInstance(res, tuple)
113        self.assertEqual(res, (1, 2, 3))
114        self.assertRaises(AttributeError, getattr, res, '_fields')
115
116    def test_named_tuple_cursor(self):
117        con = self._connect()
118        cur = con.named_tuple_cursor()
119        self.assertIsInstance(cur, pgdb.NamedTupleCursor)
120        cur.execute("select 1 as abc, 2 as de, 3 as f")
121        res = cur.fetchone()
122        cur.close()
123        con.close()
124        self.assertIsInstance(res, tuple)
125        self.assertEqual(res, (1, 2, 3))
126        self.assertEqual(res._fields, ('abc', 'de', 'f'))
127        self.assertEqual(res.abc, 1)
128        self.assertEqual(res.de, 2)
129        self.assertEqual(res.f, 3)
130
131    def test_named_tuple_cursor_with_bad_names(self):
132        con = self._connect()
133        cur = con.named_tuple_cursor()
134        self.assertIsInstance(cur, pgdb.NamedTupleCursor)
135        cur.execute("select 1, 2, 3")
136        res = cur.fetchone()
137        self.assertIsInstance(res, tuple)
138        self.assertEqual(res, (1, 2, 3))
139        old_py = OrderedDict is None  # Python 2.6 or 3.0
140        # old Python versions cannot rename tuple fields with underscore
141        if old_py:
142            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
143        else:
144            self.assertEqual(res._fields, ('_0', '_1', '_2'))
145        cur.execute("select 1 as one, 2, 3 as three")
146        res = cur.fetchone()
147        self.assertIsInstance(res, tuple)
148        self.assertEqual(res, (1, 2, 3))
149        if old_py:  # cannot auto rename with underscore
150            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
151        else:
152            self.assertEqual(res._fields, ('one', '_1', 'three'))
153        cur.execute("select 1 as abc, 2 as def")
154        res = cur.fetchone()
155        self.assertIsInstance(res, tuple)
156        self.assertEqual(res, (1, 2))
157        if old_py:
158            self.assertEqual(res._fields, ('column_0', 'column_1'))
159        else:
160            self.assertEqual(res._fields, ('abc', '_1'))
161        cur.close()
162        con.close()
163
164    def test_dict_cursor(self):
165        con = self._connect()
166        cur = con.dict_cursor()
167        self.assertIsInstance(cur, pgdb.DictCursor)
168        cur.execute("select 1 as abc, 2 as de, 3 as f")
169        res = cur.fetchone()
170        cur.close()
171        con.close()
172        self.assertIsInstance(res, dict)
173        self.assertEqual(res, {'abc': 1, 'de': 2, 'f': 3})
174        self.assertRaises(TypeError, res.popitem, last=True)
175
176    def test_ordered_dict_cursor(self):
177        con = self._connect()
178        cur = con.ordered_dict_cursor()
179        self.assertIsInstance(cur, pgdb.OrderedDictCursor)
180        cur.execute("select 1 as abc, 2 as de, 3 as f")
181        try:
182            res = cur.fetchone()
183        except pgdb.NotSupportedError:
184            if OrderedDict is None:
185                return
186            self.fail('OrderedDict supported by Python, but not by pgdb')
187        finally:
188            cur.close()
189            con.close()
190        self.assertIsInstance(res, dict)
191        self.assertEqual(res, {'abc': 1, 'de': 2, 'f': 3})
192        self.assertEqual(res.popitem(last=True), ('f', 3))
193
194    def test_row_factory(self):
195
196        class DictCursor(pgdb.Cursor):
197
198            def row_factory(self, row):
199                # not using dict comprehension to stay compatible with Py 2.6
200                return dict(('column %s' % desc[0], value)
201                    for desc, value in zip(self.description, row))
202
203        con = self._connect()
204        cur = DictCursor(con)
205        ret = cur.execute("select 1 as a, 2 as b")
206        self.assertTrue(ret is cur, 'execute() should return cursor')
207        self.assertEqual(cur.fetchone(), {'column a': 1, 'column b': 2})
208
209    def test_colnames(self):
210        con = self._connect()
211        cur = con.cursor()
212        cur.execute("select 1, 2, 3")
213        names = cur.colnames
214        self.assertIsInstance(names, list)
215        self.assertEqual(names, ['?column?', '?column?', '?column?'])
216        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
217        names = cur.colnames
218        self.assertIsInstance(names, list)
219        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
220
221    def test_coltypes(self):
222        con = self._connect()
223        cur = con.cursor()
224        cur.execute("select 1::int2, 2::int4, 3::int8")
225        types = cur.coltypes
226        self.assertIsInstance(types, list)
227        self.assertEqual(types, ['int2', 'int4', 'int8'])
228
229    def test_description_named(self):
230        con = self._connect()
231        cur = con.cursor()
232        cur.execute("select 123456789::int8 as col")
233        desc = cur.description
234        self.assertIsInstance(desc, list)
235        self.assertEqual(len(desc), 1)
236        desc = desc[0]
237        self.assertIsInstance(desc, tuple)
238        self.assertEqual(desc.name, 'col')
239        self.assertEqual(desc.type_code, 'int8')
240        self.assertIsNone(desc.display_size)
241        self.assertIsInstance(desc.internal_size, int)
242        self.assertEqual(desc.internal_size, 8)
243        self.assertIsNone(desc.precision)
244        self.assertIsNone(desc.scale)
245        self.assertIsNone(desc.null_ok)
246
247    def test_cursor_iteration(self):
248        con = self._connect()
249        cur = con.cursor()
250        cur.execute("select 1 union select 2 union select 3")
251        self.assertEqual([r[0] for r in cur], [1, 2, 3])
252
253    def test_fetch_2_rows(self):
254        Decimal = pgdb.decimal_type()
255        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
256            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
257            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
258            7897234)
259        table = self.table_prefix + 'booze'
260        con = self._connect()
261        try:
262            cur = con.cursor()
263            cur.execute("create table %s ("
264                "stringtest varchar,"
265                "binarytest bytea,"
266                "booltest bool,"
267                "integertest int4,"
268                "longtest int8,"
269                "floattest float8,"
270                "numerictest numeric,"
271                "moneytest money,"
272                "datetest date,"
273                "timetest time,"
274                "datetimetest timestamp,"
275                "intervaltest interval,"
276                "rowidtest oid)" % table)
277            for s in ('numeric', 'monetary', 'time'):
278                cur.execute("set lc_%s to 'C'" % s)
279            for _i in range(2):
280                cur.execute("insert into %s values ("
281                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
282                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
283            cur.execute("select * from %s" % table)
284            rows = cur.fetchall()
285            self.assertEqual(len(rows), 2)
286            row0 = rows[0]
287            self.assertEqual(row0, values)
288            self.assertEqual(row0, rows[1])
289            self.assertIsInstance(row0[0], str)
290            self.assertIsInstance(row0[1], bytes)
291            self.assertIsInstance(row0[2], bool)
292            self.assertIsInstance(row0[3], int)
293            self.assertIsInstance(row0[4], long)
294            self.assertIsInstance(row0[5], float)
295            self.assertIsInstance(row0[6], Decimal)
296            self.assertIsInstance(row0[7], Decimal)
297            self.assertIsInstance(row0[8], str)
298            self.assertIsInstance(row0[9], str)
299            self.assertIsInstance(row0[10], str)
300            self.assertIsInstance(row0[11], str)
301        finally:
302            con.close()
303
304    def test_sqlstate(self):
305        con = self._connect()
306        cur = con.cursor()
307        try:
308            cur.execute("select 1/0")
309        except pgdb.DatabaseError as error:
310            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
311            # the SQLSTATE error code for division by zero is 22012
312            self.assertEqual(error.sqlstate, '22012')
313
314    def test_float(self):
315        nan, inf = float('nan'), float('inf')
316        from math import isnan, isinf
317        self.assertTrue(isnan(nan) and not isinf(nan))
318        self.assertTrue(isinf(inf) and not isnan(inf))
319        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
320        table = self.table_prefix + 'booze'
321        con = self._connect()
322        try:
323            cur = con.cursor()
324            cur.execute(
325                "create table %s (n smallint, floattest float)" % table)
326            params = enumerate(values)
327            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
328            cur.execute("select * from %s order by 1" % table)
329            rows = cur.fetchall()
330        finally:
331            con.close()
332        self.assertEqual(len(rows), len(values))
333        rows = [row[1] for row in rows]
334        for inval, outval in zip(values, rows):
335            if isinf(inval):
336                self.assertTrue(isinf(outval))
337                if inval < 0:
338                    self.assertTrue(outval < 0)
339                else:
340                    self.assertTrue(outval > 0)
341            elif isnan(inval):
342                self.assertTrue(isnan(outval))
343            else:
344                self.assertEqual(inval, outval)
345
346    def test_set_decimal_type(self):
347        decimal_type = pgdb.decimal_type()
348        self.assertTrue(decimal_type is not None and callable(decimal_type))
349        con = self._connect()
350        try:
351            cur = con.cursor()
352            self.assertTrue(pgdb.decimal_type(int) is int)
353            cur.execute('select 42')
354            value = cur.fetchone()[0]
355            self.assertTrue(isinstance(value, int))
356            self.assertEqual(value, 42)
357            self.assertTrue(pgdb.decimal_type(float) is float)
358            cur.execute('select 4.25')
359            value = cur.fetchone()[0]
360            self.assertTrue(isinstance(value, float))
361            self.assertEqual(value, 4.25)
362        finally:
363            con.close()
364            pgdb.decimal_type(decimal_type)
365        self.assertTrue(pgdb.decimal_type() is decimal_type)
366
367    def test_unicode_with_utf8(self):
368        table = self.table_prefix + 'booze'
369        input = u"He wes Leovenaðes sone — liðe him be Drihten"
370        con = self._connect()
371        try:
372            cur = con.cursor()
373            cur.execute("create table %s (t text)" % table)
374            try:
375                cur.execute("set client_encoding=utf8")
376                cur.execute(u"select '%s'" % input)
377            except Exception:
378                self.skipTest("database does not support utf8")
379            output1 = cur.fetchone()[0]
380            cur.execute("insert into %s values (%%s)" % table, (input,))
381            cur.execute("select * from %s" % table)
382            output2 = cur.fetchone()[0]
383            cur.execute("select t = '%s' from %s" % (input, table))
384            output3 = cur.fetchone()[0]
385            cur.execute("select t = %%s from %s" % table, (input,))
386            output4 = cur.fetchone()[0]
387        finally:
388            con.close()
389        if str is bytes:  # Python < 3.0
390            input = input.encode('utf8')
391        self.assertIsInstance(output1, str)
392        self.assertEqual(output1, input)
393        self.assertIsInstance(output2, str)
394        self.assertEqual(output2, input)
395        self.assertIsInstance(output3, bool)
396        self.assertTrue(output3)
397        self.assertIsInstance(output4, bool)
398        self.assertTrue(output4)
399
400    def test_unicode_with_latin1(self):
401        table = self.table_prefix + 'booze'
402        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
403        con = self._connect()
404        try:
405            cur = con.cursor()
406            cur.execute("create table %s (t text)" % table)
407            try:
408                cur.execute("set client_encoding=latin1")
409                cur.execute(u"select '%s'" % input)
410            except Exception:
411                self.skipTest("database does not support latin1")
412            output1 = cur.fetchone()[0]
413            cur.execute("insert into %s values (%%s)" % table, (input,))
414            cur.execute("select * from %s" % table)
415            output2 = cur.fetchone()[0]
416            cur.execute("select t = '%s' from %s" % (input, table))
417            output3 = cur.fetchone()[0]
418            cur.execute("select t = %%s from %s" % table, (input,))
419            output4 = cur.fetchone()[0]
420        finally:
421            con.close()
422        if str is bytes:  # Python < 3.0
423            input = input.encode('latin1')
424        self.assertIsInstance(output1, str)
425        self.assertEqual(output1, input)
426        self.assertIsInstance(output2, str)
427        self.assertEqual(output2, input)
428        self.assertIsInstance(output3, bool)
429        self.assertTrue(output3)
430        self.assertIsInstance(output4, bool)
431        self.assertTrue(output4)
432
433    def test_bool(self):
434        values = [False, True, None, 't', 'f', 'true', 'false']
435        table = self.table_prefix + 'booze'
436        con = self._connect()
437        try:
438            cur = con.cursor()
439            cur.execute(
440                "create table %s (n smallint, booltest bool)" % table)
441            params = enumerate(values)
442            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
443            cur.execute("select * from %s order by 1" % table)
444            rows = cur.fetchall()
445        finally:
446            con.close()
447        rows = [row[1] for row in rows]
448        values[3] = values[5] = True
449        values[4] = values[6] = False
450        self.assertEqual(rows, values)
451
452    def test_nextset(self):
453        con = self._connect()
454        cur = con.cursor()
455        self.assertRaises(con.NotSupportedError, cur.nextset)
456
457    def test_setoutputsize(self):
458        pass  # not supported
459
460    def test_connection_errors(self):
461        con = self._connect()
462        self.assertEqual(con.Error, pgdb.Error)
463        self.assertEqual(con.Warning, pgdb.Warning)
464        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
465        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
466        self.assertEqual(con.InternalError, pgdb.InternalError)
467        self.assertEqual(con.OperationalError, pgdb.OperationalError)
468        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
469        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
470        self.assertEqual(con.DataError, pgdb.DataError)
471        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
472
473    def test_connection_as_contextmanager(self):
474        table = self.table_prefix + 'booze'
475        con = self._connect()
476        try:
477            cur = con.cursor()
478            cur.execute("create table %s (n smallint check(n!=4))" % table)
479            with con:
480                cur.execute("insert into %s values (1)" % table)
481                cur.execute("insert into %s values (2)" % table)
482            try:
483                with con:
484                    cur.execute("insert into %s values (3)" % table)
485                    cur.execute("insert into %s values (4)" % table)
486            except con.ProgrammingError as error:
487                self.assertTrue('check' in str(error).lower())
488            with con:
489                cur.execute("insert into %s values (5)" % table)
490                cur.execute("insert into %s values (6)" % table)
491            try:
492                with con:
493                    cur.execute("insert into %s values (7)" % table)
494                    cur.execute("insert into %s values (8)" % table)
495                    raise ValueError('transaction should rollback')
496            except ValueError as error:
497                self.assertEqual(str(error), 'transaction should rollback')
498            with con:
499                cur.execute("insert into %s values (9)" % table)
500            cur.execute("select * from %s order by 1" % table)
501            rows = cur.fetchall()
502            rows = [row[0] for row in rows]
503        finally:
504            con.close()
505        self.assertEqual(rows, [1, 2, 5, 6, 9])
506
507    def test_cursor_connection(self):
508        con = self._connect()
509        cur = con.cursor()
510        self.assertEqual(cur.connection, con)
511        cur.close()
512
513    def test_cursor_as_contextmanager(self):
514        con = self._connect()
515        with con.cursor() as cur:
516            self.assertEqual(cur.connection, con)
517
518    def test_pgdb_type(self):
519        self.assertEqual(pgdb.STRING, pgdb.STRING)
520        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
521        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
522        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
523        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
524        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
525        self.assertEqual('char', pgdb.STRING)
526        self.assertEqual('varchar', pgdb.STRING)
527        self.assertEqual('text', pgdb.STRING)
528        self.assertNotEqual('numeric', pgdb.STRING)
529        self.assertEqual('numeric', pgdb.NUMERIC)
530        self.assertEqual('numeric', pgdb.NUMBER)
531        self.assertEqual('int4', pgdb.NUMBER)
532        self.assertNotEqual('int4', pgdb.NUMERIC)
533        self.assertEqual('int2', pgdb.SMALLINT)
534        self.assertNotEqual('int4', pgdb.SMALLINT)
535        self.assertEqual('int2', pgdb.INTEGER)
536        self.assertEqual('int4', pgdb.INTEGER)
537        self.assertEqual('int8', pgdb.INTEGER)
538        self.assertNotEqual('int4', pgdb.LONG)
539        self.assertEqual('int8', pgdb.LONG)
540        self.assertTrue('char' in pgdb.STRING)
541        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
542        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
543        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
544        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
545
546
547if __name__ == '__main__':
548    unittest.main()
Note: See TracBrowser for help on using the repository browser.