source: trunk/tests/test_dbapi20.py @ 772

Last change on this file since 772 was 772, checked in by cito, 4 years ago

Improve test coverage for the pgdb module

Includes a simple patch that allows storing Python lists or tuple values
in PostgreSQL array fields (they are not yet converted when read, though).

Also re-activated the shortcut methods on the connection again
since they can be sometimes useful.

Test coverage is now around 95%, the remaining lines are due to support for
old Python versions or obscure database errors that can't easily be aroused.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 26.4 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 772 2016-01-20 22:44:20Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import datetime
32
33try:
34    long
35except NameError:  # Python >= 3.0
36    long = int
37
38try:
39    from collections import OrderedDict
40except ImportError:  # Python 2.6 or 3.0
41    OrderedDict = None
42
43
44class PgBitString:
45    """Test object with a PostgreSQL representation as Bit String."""
46
47    def __init__(self, value):
48        self.value = value
49
50    def __pg_repr__(self):
51         return "B'{0:b}'".format(self.value)
52
53
54class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
55
56    driver = pgdb
57    connect_args = ()
58    connect_kw_args = {'database': dbname,
59        'host': '%s:%d' % (dbhost or '', dbport or -1)}
60
61    lower_func = 'lower'  # For stored procedure test
62
63    def setUp(self):
64        # Call superclass setUp in case this does something in the future
65        dbapi20.DatabaseAPI20Test.setUp(self)
66        try:
67            con = self._connect()
68            con.close()
69        except pgdb.Error:  # try to create a missing database
70            import pg
71            try:  # first try to log in as superuser
72                db = pg.DB('postgres', dbhost or None, dbport or -1,
73                    user='postgres')
74            except Exception:  # then try to log in as current user
75                db = pg.DB('postgres', dbhost or None, dbport or -1)
76            db.query('create database ' + dbname)
77
78    def tearDown(self):
79        dbapi20.DatabaseAPI20Test.tearDown(self)
80
81    def test_callproc_no_params(self):
82        con = self._connect()
83        cur = con.cursor()
84        # note that now() does not change within a transaction
85        cur.execute('select now()')
86        now = cur.fetchone()[0]
87        res = cur.callproc('now')
88        self.assertIsNone(res)
89        res = cur.fetchone()[0]
90        self.assertEqual(res, now)
91
92    def test_callproc_bad_params(self):
93        con = self._connect()
94        cur = con.cursor()
95        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
96        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
97
98    def test_callproc_one_param(self):
99        con = self._connect()
100        cur = con.cursor()
101        params = (42.4382,)
102        res = cur.callproc("round", params)
103        self.assertIs(res, params)
104        res = cur.fetchone()[0]
105        self.assertEqual(res, 42)
106
107    def test_callproc_two_params(self):
108        con = self._connect()
109        cur = con.cursor()
110        params = (9, 4)
111        res = cur.callproc("div", params)
112        self.assertIs(res, params)
113        res = cur.fetchone()[0]
114        self.assertEqual(res, 2)
115
116    def test_cursor_type(self):
117
118        class TestCursor(pgdb.Cursor):
119            pass
120
121        con = self._connect()
122        self.assertIs(con.cursor_type, pgdb.Cursor)
123        cur = con.cursor()
124        self.assertIsInstance(cur, pgdb.Cursor)
125        self.assertNotIsInstance(cur, TestCursor)
126        con.cursor_type = TestCursor
127        cur = con.cursor()
128        self.assertIsInstance(cur, TestCursor)
129        cur = con.cursor()
130        self.assertIsInstance(cur, TestCursor)
131        con = self._connect()
132        self.assertIs(con.cursor_type, pgdb.Cursor)
133        cur = con.cursor()
134        self.assertIsInstance(cur, pgdb.Cursor)
135        self.assertNotIsInstance(cur, TestCursor)
136
137    def test_row_factory(self):
138
139        class TestCursor(pgdb.Cursor):
140
141            def row_factory(self, row):
142                return dict(('column %s' % desc[0], value)
143                    for desc, value in zip(self.description, row))
144
145        con = self._connect()
146        con.cursor_type = TestCursor
147        cur = con.cursor()
148        self.assertIsInstance(cur, TestCursor)
149        res = cur.execute("select 1 as a, 2 as b")
150        self.assertIs(res, cur, 'execute() should return cursor')
151        res = cur.fetchone()
152        self.assertIsInstance(res, dict)
153        self.assertEqual(res, {'column a': 1, 'column b': 2})
154        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
155        res = cur.fetchall()
156        self.assertIsInstance(res, list)
157        self.assertEqual(len(res), 2)
158        self.assertIsInstance(res[0], dict)
159        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
160        self.assertIsInstance(res[1], dict)
161        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
162
163    def test_build_row_factory(self):
164
165        class TestCursor(pgdb.Cursor):
166
167            def build_row_factory(self):
168                keys = [desc[0] for desc in self.description]
169                return lambda row: dict((key, value)
170                    for key, value in zip(keys, row))
171
172        con = self._connect()
173        con.cursor_type = TestCursor
174        cur = con.cursor()
175        self.assertIsInstance(cur, TestCursor)
176        cur.execute("select 1 as a, 2 as b")
177        res = cur.fetchone()
178        self.assertIsInstance(res, dict)
179        self.assertEqual(res, {'a': 1, 'b': 2})
180        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
181        res = cur.fetchall()
182        self.assertIsInstance(res, list)
183        self.assertEqual(len(res), 2)
184        self.assertIsInstance(res[0], dict)
185        self.assertEqual(res[0], {'a': 1, 'b': 2})
186        self.assertIsInstance(res[1], dict)
187        self.assertEqual(res[1], {'a': 3, 'b': 4})
188
189    def test_cursor_with_named_columns(self):
190        con = self._connect()
191        cur = con.cursor()
192        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
193        self.assertIs(res, cur, 'execute() should return cursor')
194        res = cur.fetchone()
195        self.assertIsInstance(res, tuple)
196        self.assertEqual(res, (1, 2, 3))
197        self.assertEqual(res._fields, ('abc', 'de', 'f'))
198        self.assertEqual(res.abc, 1)
199        self.assertEqual(res.de, 2)
200        self.assertEqual(res.f, 3)
201        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
202        res = cur.fetchall()
203        self.assertIsInstance(res, list)
204        self.assertEqual(len(res), 2)
205        self.assertIsInstance(res[0], tuple)
206        self.assertEqual(res[0], (1, 2))
207        self.assertEqual(res[0]._fields, ('one', 'two'))
208        self.assertIsInstance(res[1], tuple)
209        self.assertEqual(res[1], (3, 4))
210        self.assertEqual(res[1]._fields, ('one', 'two'))
211
212    def test_cursor_with_unnamed_columns(self):
213        con = self._connect()
214        cur = con.cursor()
215        cur.execute("select 1, 2, 3")
216        res = cur.fetchone()
217        self.assertIsInstance(res, tuple)
218        self.assertEqual(res, (1, 2, 3))
219        old_py = OrderedDict is None  # Python 2.6 or 3.0
220        # old Python versions cannot rename tuple fields with underscore
221        if old_py:
222            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
223        else:
224            self.assertEqual(res._fields, ('_0', '_1', '_2'))
225        cur.execute("select 1 as one, 2, 3 as three")
226        res = cur.fetchone()
227        self.assertIsInstance(res, tuple)
228        self.assertEqual(res, (1, 2, 3))
229        if old_py:  # cannot auto rename with underscore
230            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
231        else:
232            self.assertEqual(res._fields, ('one', '_1', 'three'))
233        cur.execute("select 1 as abc, 2 as def")
234        res = cur.fetchone()
235        self.assertIsInstance(res, tuple)
236        self.assertEqual(res, (1, 2))
237        if old_py:
238            self.assertEqual(res._fields, ('column_0', 'column_1'))
239        else:
240            self.assertEqual(res._fields, ('abc', '_1'))
241
242    def test_colnames(self):
243        con = self._connect()
244        cur = con.cursor()
245        cur.execute("select 1, 2, 3")
246        names = cur.colnames
247        self.assertIsInstance(names, list)
248        self.assertEqual(names, ['?column?', '?column?', '?column?'])
249        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
250        names = cur.colnames
251        self.assertIsInstance(names, list)
252        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
253
254    def test_coltypes(self):
255        con = self._connect()
256        cur = con.cursor()
257        cur.execute("select 1::int2, 2::int4, 3::int8")
258        types = cur.coltypes
259        self.assertIsInstance(types, list)
260        self.assertEqual(types, ['int2', 'int4', 'int8'])
261
262    def test_description_fields(self):
263        con = self._connect()
264        cur = con.cursor()
265        cur.execute("select 123456789::int8 as col")
266        desc = cur.description
267        self.assertIsInstance(desc, list)
268        self.assertEqual(len(desc), 1)
269        desc = desc[0]
270        self.assertIsInstance(desc, tuple)
271        self.assertEqual(len(desc), 7)
272        self.assertEqual(desc.name, 'col')
273        self.assertEqual(desc.type_code, 'int8')
274        self.assertIsNone(desc.display_size)
275        self.assertIsInstance(desc.internal_size, int)
276        self.assertEqual(desc.internal_size, 8)
277        self.assertIsNone(desc.precision)
278        self.assertIsNone(desc.scale)
279        self.assertIsNone(desc.null_ok)
280
281    def test_cursor_iteration(self):
282        con = self._connect()
283        cur = con.cursor()
284        cur.execute("select 1 union select 2 union select 3")
285        self.assertEqual([r[0] for r in cur], [1, 2, 3])
286
287    def test_fetch_2_rows(self):
288        Decimal = pgdb.decimal_type()
289        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
290            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
291            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
292            7897234)
293        table = self.table_prefix + 'booze'
294        con = self._connect()
295        try:
296            cur = con.cursor()
297            cur.execute("set datestyle to 'iso'")
298            cur.execute("create table %s ("
299                "stringtest varchar,"
300                "binarytest bytea,"
301                "booltest bool,"
302                "integertest int4,"
303                "longtest int8,"
304                "floattest float8,"
305                "numerictest numeric,"
306                "moneytest money,"
307                "datetest date,"
308                "timetest time,"
309                "datetimetest timestamp,"
310                "intervaltest interval,"
311                "rowidtest oid)" % table)
312            cur.execute("set standard_conforming_strings to on")
313            for s in ('numeric', 'monetary', 'time'):
314                cur.execute("set lc_%s to 'C'" % s)
315            for _i in range(2):
316                cur.execute("insert into %s values ("
317                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
318                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
319            cur.execute("select * from %s" % table)
320            rows = cur.fetchall()
321            self.assertEqual(len(rows), 2)
322            row0 = rows[0]
323            self.assertEqual(row0, values)
324            self.assertEqual(row0, rows[1])
325            self.assertIsInstance(row0[0], str)
326            self.assertIsInstance(row0[1], bytes)
327            self.assertIsInstance(row0[2], bool)
328            self.assertIsInstance(row0[3], int)
329            self.assertIsInstance(row0[4], long)
330            self.assertIsInstance(row0[5], float)
331            self.assertIsInstance(row0[6], Decimal)
332            self.assertIsInstance(row0[7], Decimal)
333            self.assertIsInstance(row0[8], str)
334            self.assertIsInstance(row0[9], str)
335            self.assertIsInstance(row0[10], str)
336            self.assertIsInstance(row0[11], str)
337        finally:
338            con.close()
339
340    def test_sqlstate(self):
341        con = self._connect()
342        cur = con.cursor()
343        try:
344            cur.execute("select 1/0")
345        except pgdb.DatabaseError as error:
346            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
347            # the SQLSTATE error code for division by zero is 22012
348            self.assertEqual(error.sqlstate, '22012')
349
350    def test_float(self):
351        nan, inf = float('nan'), float('inf')
352        from math import isnan, isinf
353        self.assertTrue(isnan(nan) and not isinf(nan))
354        self.assertTrue(isinf(inf) and not isnan(inf))
355        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
356            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
357        table = self.table_prefix + 'booze'
358        con = self._connect()
359        try:
360            cur = con.cursor()
361            cur.execute(
362                "create table %s (n smallint, floattest float)" % table)
363            params = enumerate(values)
364            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
365            cur.execute("select * from %s order by 1" % table)
366            rows = cur.fetchall()
367        finally:
368            con.close()
369        self.assertEqual(len(rows), len(values))
370        rows = [row[1] for row in rows]
371        for inval, outval in zip(values, rows):
372            if inval in ('inf', 'Infinity'):
373                inval = inf
374            elif inval in ('-inf', '-Infinity'):
375                inval = -inf
376            elif inval in ('nan', 'NaN'):
377                inval = nan
378            if isinf(inval):
379                self.assertTrue(isinf(outval))
380                if inval < 0:
381                    self.assertTrue(outval < 0)
382                else:
383                    self.assertTrue(outval > 0)
384            elif isnan(inval):
385                self.assertTrue(isnan(outval))
386            else:
387                self.assertEqual(inval, outval)
388
389    def test_datetime(self):
390        values = ['2011-07-17 15:47:42', datetime(2016, 1, 20, 20, 15, 51)]
391        table = self.table_prefix + 'booze'
392        con = self._connect()
393        try:
394            cur = con.cursor()
395            cur.execute("set datestyle to 'iso'")
396            cur.execute(
397                "create table %s (n smallint, ts timestamp)" % table)
398            params = enumerate(values)
399            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
400            cur.execute("select * from %s order by 1" % table)
401            rows = cur.fetchall()
402        finally:
403            con.close()
404        self.assertEqual(len(rows), len(values))
405        rows = [row[1] for row in rows]
406        for inval, outval in zip(values, rows):
407            if isinstance(inval, datetime):
408                inval = inval.strftime('%Y-%m-%d %H:%M:%S')
409            self.assertEqual(inval, outval)
410
411    def test_array(self):
412        values = ([20000, 25000, 25000, 30000],
413            [['breakfast', 'consulting'], ['meeting', 'lunch']])
414        output = ('{20000,25000,25000,30000}',
415            '{{breakfast,consulting},{meeting,lunch}}')
416        table = self.table_prefix + 'booze'
417        con = self._connect()
418        try:
419            cur = con.cursor()
420            cur.execute("create table %s (i int[], t text[][])" % table)
421            cur.execute("insert into %s values (%%s,%%s)" % table, values)
422            cur.execute("select * from %s" % table)
423            row = cur.fetchone()
424        finally:
425            con.close()
426        self.assertEqual(row, output)
427
428    def test_custom_type(self):
429        values = [3, 5, 65]
430        values = list(map(PgBitString, values))
431        table = self.table_prefix + 'booze'
432        con = self._connect()
433        try:
434            cur = con.cursor()
435            params = enumerate(values)  # params have __pg_repr__ method
436            cur.execute(
437                'create table "%s" (n smallint, b bit varying(7))' % table)
438            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
439            cur.execute("select * from %s order by 1" % table)
440            rows = cur.fetchall()
441        finally:
442            con.close()
443        self.assertEqual(len(rows), len(values))
444        con = self._connect()
445        try:
446            cur = con.cursor()
447            params = (1, object())  # an object that cannot be handled
448            self.assertRaises(pgdb.InterfaceError, cur.execute,
449                "insert into %s values (%%s,%%s)" % table, params)
450        finally:
451            con.close()
452
453    def test_set_decimal_type(self):
454        decimal_type = pgdb.decimal_type()
455        self.assertTrue(decimal_type is not None and callable(decimal_type))
456        con = self._connect()
457        try:
458            cur = con.cursor()
459            self.assertTrue(pgdb.decimal_type(int) is int)
460            cur.execute('select 42')
461            value = cur.fetchone()[0]
462            self.assertTrue(isinstance(value, int))
463            self.assertEqual(value, 42)
464            self.assertTrue(pgdb.decimal_type(float) is float)
465            cur.execute('select 4.25')
466            value = cur.fetchone()[0]
467            self.assertTrue(isinstance(value, float))
468            self.assertEqual(value, 4.25)
469        finally:
470            con.close()
471            pgdb.decimal_type(decimal_type)
472        self.assertTrue(pgdb.decimal_type() is decimal_type)
473
474    def test_unicode_with_utf8(self):
475        table = self.table_prefix + 'booze'
476        input = u"He wes Leovenaðes sone — liðe him be Drihten"
477        con = self._connect()
478        try:
479            cur = con.cursor()
480            cur.execute("create table %s (t text)" % table)
481            try:
482                cur.execute("set client_encoding=utf8")
483                cur.execute(u"select '%s'" % input)
484            except Exception:
485                self.skipTest("database does not support utf8")
486            output1 = cur.fetchone()[0]
487            cur.execute("insert into %s values (%%s)" % table, (input,))
488            cur.execute("select * from %s" % table)
489            output2 = cur.fetchone()[0]
490            cur.execute("select t = '%s' from %s" % (input, table))
491            output3 = cur.fetchone()[0]
492            cur.execute("select t = %%s from %s" % table, (input,))
493            output4 = cur.fetchone()[0]
494        finally:
495            con.close()
496        if str is bytes:  # Python < 3.0
497            input = input.encode('utf8')
498        self.assertIsInstance(output1, str)
499        self.assertEqual(output1, input)
500        self.assertIsInstance(output2, str)
501        self.assertEqual(output2, input)
502        self.assertIsInstance(output3, bool)
503        self.assertTrue(output3)
504        self.assertIsInstance(output4, bool)
505        self.assertTrue(output4)
506
507    def test_unicode_with_latin1(self):
508        table = self.table_prefix + 'booze'
509        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
510        con = self._connect()
511        try:
512            cur = con.cursor()
513            cur.execute("create table %s (t text)" % table)
514            try:
515                cur.execute("set client_encoding=latin1")
516                cur.execute(u"select '%s'" % input)
517            except Exception:
518                self.skipTest("database does not support latin1")
519            output1 = cur.fetchone()[0]
520            cur.execute("insert into %s values (%%s)" % table, (input,))
521            cur.execute("select * from %s" % table)
522            output2 = cur.fetchone()[0]
523            cur.execute("select t = '%s' from %s" % (input, table))
524            output3 = cur.fetchone()[0]
525            cur.execute("select t = %%s from %s" % table, (input,))
526            output4 = cur.fetchone()[0]
527        finally:
528            con.close()
529        if str is bytes:  # Python < 3.0
530            input = input.encode('latin1')
531        self.assertIsInstance(output1, str)
532        self.assertEqual(output1, input)
533        self.assertIsInstance(output2, str)
534        self.assertEqual(output2, input)
535        self.assertIsInstance(output3, bool)
536        self.assertTrue(output3)
537        self.assertIsInstance(output4, bool)
538        self.assertTrue(output4)
539
540    def test_bool(self):
541        values = [False, True, None, 't', 'f', 'true', 'false']
542        table = self.table_prefix + 'booze'
543        con = self._connect()
544        try:
545            cur = con.cursor()
546            cur.execute(
547                "create table %s (n smallint, booltest bool)" % table)
548            params = enumerate(values)
549            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
550            cur.execute("select * from %s order by 1" % table)
551            rows = cur.fetchall()
552        finally:
553            con.close()
554        rows = [row[1] for row in rows]
555        values[3] = values[5] = True
556        values[4] = values[6] = False
557        self.assertEqual(rows, values)
558
559    def test_execute_edge_cases(self):
560        con = self._connect()
561        try:
562            cur = con.cursor()
563            sql = 'invalid'  # should be ignored with empty parameter list
564            cur.executemany(sql, [])
565            sql = 'select %d + 1'
566            cur.execute(sql, [(1,)])  # deprecated use of execute()
567            self.assertEqual(cur.fetchone()[0], 2)
568            sql = 'select 1/0'  # cannot be executed
569            self.assertRaises(pgdb.ProgrammingError, cur.execute, sql)
570            cur.close()
571            con.rollback()
572            if pgdb.shortcutmethods:
573                res = con.execute('select %d', (1,)).fetchone()
574                self.assertEqual(res, (1,))
575                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
576                self.assertEqual(res, (2,))
577        finally:
578            con.close()
579        sql = 'select 1'  # cannot be executed after connection is closed
580        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
581
582    def test_fetchmany_with_keep(self):
583        con = self._connect()
584        try:
585            cur = con.cursor()
586            self.assertEqual(cur.arraysize, 1)
587            cur.execute('select * from generate_series(1, 25)')
588            self.assertEqual(len(cur.fetchmany()), 1)
589            self.assertEqual(len(cur.fetchmany()), 1)
590            self.assertEqual(cur.arraysize, 1)
591            cur.arraysize = 3
592            self.assertEqual(len(cur.fetchmany()), 3)
593            self.assertEqual(len(cur.fetchmany()), 3)
594            self.assertEqual(cur.arraysize, 3)
595            self.assertEqual(len(cur.fetchmany(size=2)), 2)
596            self.assertEqual(cur.arraysize, 3)
597            self.assertEqual(len(cur.fetchmany()), 3)
598            self.assertEqual(len(cur.fetchmany()), 3)
599            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
600            self.assertEqual(cur.arraysize, 2)
601            self.assertEqual(len(cur.fetchmany()), 2)
602            self.assertEqual(len(cur.fetchmany()), 2)
603            self.assertEqual(len(cur.fetchmany(25)), 3)
604        finally:
605            con.close()
606
607    def test_nextset(self):
608        con = self._connect()
609        cur = con.cursor()
610        self.assertRaises(con.NotSupportedError, cur.nextset)
611
612    def test_setoutputsize(self):
613        pass  # not supported
614
615    def test_connection_errors(self):
616        con = self._connect()
617        self.assertEqual(con.Error, pgdb.Error)
618        self.assertEqual(con.Warning, pgdb.Warning)
619        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
620        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
621        self.assertEqual(con.InternalError, pgdb.InternalError)
622        self.assertEqual(con.OperationalError, pgdb.OperationalError)
623        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
624        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
625        self.assertEqual(con.DataError, pgdb.DataError)
626        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
627
628    def test_connection_as_contextmanager(self):
629        table = self.table_prefix + 'booze'
630        con = self._connect()
631        try:
632            cur = con.cursor()
633            cur.execute("create table %s (n smallint check(n!=4))" % table)
634            with con:
635                cur.execute("insert into %s values (1)" % table)
636                cur.execute("insert into %s values (2)" % table)
637            try:
638                with con:
639                    cur.execute("insert into %s values (3)" % table)
640                    cur.execute("insert into %s values (4)" % table)
641            except con.ProgrammingError as error:
642                self.assertTrue('check' in str(error).lower())
643            with con:
644                cur.execute("insert into %s values (5)" % table)
645                cur.execute("insert into %s values (6)" % table)
646            try:
647                with con:
648                    cur.execute("insert into %s values (7)" % table)
649                    cur.execute("insert into %s values (8)" % table)
650                    raise ValueError('transaction should rollback')
651            except ValueError as error:
652                self.assertEqual(str(error), 'transaction should rollback')
653            with con:
654                cur.execute("insert into %s values (9)" % table)
655            cur.execute("select * from %s order by 1" % table)
656            rows = cur.fetchall()
657            rows = [row[0] for row in rows]
658        finally:
659            con.close()
660        self.assertEqual(rows, [1, 2, 5, 6, 9])
661
662    def test_cursor_connection(self):
663        con = self._connect()
664        cur = con.cursor()
665        self.assertEqual(cur.connection, con)
666        cur.close()
667
668    def test_cursor_as_contextmanager(self):
669        con = self._connect()
670        with con.cursor() as cur:
671            self.assertEqual(cur.connection, con)
672
673    def test_pgdb_type(self):
674        self.assertEqual(pgdb.STRING, pgdb.STRING)
675        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
676        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
677        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
678        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
679        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
680        self.assertEqual('char', pgdb.STRING)
681        self.assertEqual('varchar', pgdb.STRING)
682        self.assertEqual('text', pgdb.STRING)
683        self.assertNotEqual('numeric', pgdb.STRING)
684        self.assertEqual('numeric', pgdb.NUMERIC)
685        self.assertEqual('numeric', pgdb.NUMBER)
686        self.assertEqual('int4', pgdb.NUMBER)
687        self.assertNotEqual('int4', pgdb.NUMERIC)
688        self.assertEqual('int2', pgdb.SMALLINT)
689        self.assertNotEqual('int4', pgdb.SMALLINT)
690        self.assertEqual('int2', pgdb.INTEGER)
691        self.assertEqual('int4', pgdb.INTEGER)
692        self.assertEqual('int8', pgdb.INTEGER)
693        self.assertNotEqual('int4', pgdb.LONG)
694        self.assertEqual('int8', pgdb.LONG)
695        self.assertTrue('char' in pgdb.STRING)
696        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
697        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
698        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
699        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
700
701
702if __name__ == '__main__':
703    unittest.main()
Note: See TracBrowser for help on using the repository browser.