source: trunk/tests/test_dbapi20.py

Last change on this file was 995, checked in by cito, 5 months ago

Support autocommit attribute on pgdb connections

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 53.2 KB
Line 
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 995 2019-04-25 14:10:20Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31import gc
32import sys
33
34from datetime import date, time, datetime, timedelta
35from uuid import UUID as Uuid
36
37try:  # noinspection PyUnresolvedReferences
38    long
39except NameError:  # Python >= 3.0
40    long = int
41
42try:
43    from collections import OrderedDict
44except ImportError:  # Python 2.6 or 3.0
45    OrderedDict = None
46
47
48class PgBitString:
49    """Test object with a PostgreSQL representation as Bit String."""
50
51    def __init__(self, value):
52        self.value = value
53
54    def __pg_repr__(self):
55         return "B'{0:b}'".format(self.value)
56
57
58class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
59
60    driver = pgdb
61    connect_args = ()
62    connect_kw_args = {'database': dbname,
63        'host': '%s:%d' % (dbhost or '', dbport or -1)}
64
65    lower_func = 'lower'  # For stored procedure test
66
67    def setUp(self):
68        # Call superclass setUp in case this does something in the future
69        dbapi20.DatabaseAPI20Test.setUp(self)
70        try:
71            con = self._connect()
72            con.close()
73        except pgdb.Error:  # try to create a missing database
74            import pg
75            try:  # first try to log in as superuser
76                db = pg.DB('postgres', dbhost or None, dbport or -1,
77                    user='postgres')
78            except Exception:  # then try to log in as current user
79                db = pg.DB('postgres', dbhost or None, dbport or -1)
80            db.query('create database ' + dbname)
81
82    def tearDown(self):
83        dbapi20.DatabaseAPI20Test.tearDown(self)
84
85    def test_version(self):
86        v = pgdb.version
87        self.assertIsInstance(v, str)
88        self.assertIn('.', v)
89        self.assertEqual(pgdb.__version__, v)
90
91    def test_connect_kwargs(self):
92        application_name = 'PyGreSQL DB API 2.0 Test'
93        self.connect_kw_args['application_name'] = application_name
94        con = self._connect()
95        cur = con.cursor()
96        cur.execute("select application_name from pg_stat_activity"
97            " where application_name = %s", (application_name,))
98        self.assertEqual(cur.fetchone(), (application_name,))
99
100    def test_percent_sign(self):
101        con = self._connect()
102        cur = con.cursor()
103        cur.execute("select %s, 'a %% sign'", ('a % sign',))
104        self.assertEqual(cur.fetchone(), ('a % sign', 'a % sign'))
105        cur.execute("select 'a % sign'")
106        self.assertEqual(cur.fetchone(), ('a % sign',))
107        cur.execute("select 'a %% sign'")
108        self.assertEqual(cur.fetchone(), ('a % sign',))
109
110    def test_callproc_no_params(self):
111        con = self._connect()
112        cur = con.cursor()
113        # note that now() does not change within a transaction
114        cur.execute('select now()')
115        now = cur.fetchone()[0]
116        res = cur.callproc('now')
117        self.assertIsNone(res)
118        res = cur.fetchone()[0]
119        self.assertEqual(res, now)
120
121    def test_callproc_bad_params(self):
122        con = self._connect()
123        cur = con.cursor()
124        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
125        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
126
127    def test_callproc_one_param(self):
128        con = self._connect()
129        cur = con.cursor()
130        params = (42.4382,)
131        res = cur.callproc("round", params)
132        self.assertIs(res, params)
133        res = cur.fetchone()[0]
134        self.assertEqual(res, 42)
135
136    def test_callproc_two_params(self):
137        con = self._connect()
138        cur = con.cursor()
139        params = (9, 4)
140        res = cur.callproc("div", params)
141        self.assertIs(res, params)
142        res = cur.fetchone()[0]
143        self.assertEqual(res, 2)
144
145    def test_cursor_type(self):
146
147        class TestCursor(pgdb.Cursor):
148            pass
149
150        con = self._connect()
151        self.assertIs(con.cursor_type, pgdb.Cursor)
152        cur = con.cursor()
153        self.assertIsInstance(cur, pgdb.Cursor)
154        self.assertNotIsInstance(cur, TestCursor)
155        con.cursor_type = TestCursor
156        cur = con.cursor()
157        self.assertIsInstance(cur, TestCursor)
158        cur = con.cursor()
159        self.assertIsInstance(cur, TestCursor)
160        con = self._connect()
161        self.assertIs(con.cursor_type, pgdb.Cursor)
162        cur = con.cursor()
163        self.assertIsInstance(cur, pgdb.Cursor)
164        self.assertNotIsInstance(cur, TestCursor)
165
166    def test_row_factory(self):
167
168        class TestCursor(pgdb.Cursor):
169
170            def row_factory(self, row):
171                return dict(('column %s' % desc[0], value)
172                    for desc, value in zip(self.description, row))
173
174        con = self._connect()
175        con.cursor_type = TestCursor
176        cur = con.cursor()
177        self.assertIsInstance(cur, TestCursor)
178        res = cur.execute("select 1 as a, 2 as b")
179        self.assertIs(res, cur, 'execute() should return cursor')
180        res = cur.fetchone()
181        self.assertIsInstance(res, dict)
182        self.assertEqual(res, {'column a': 1, 'column b': 2})
183        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
184        res = cur.fetchall()
185        self.assertIsInstance(res, list)
186        self.assertEqual(len(res), 2)
187        self.assertIsInstance(res[0], dict)
188        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
189        self.assertIsInstance(res[1], dict)
190        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
191
192    def test_build_row_factory(self):
193
194        class TestCursor(pgdb.Cursor):
195
196            def build_row_factory(self):
197                keys = [desc[0] for desc in self.description]
198                return lambda row: dict((key, value)
199                    for key, value in zip(keys, row))
200
201        con = self._connect()
202        con.cursor_type = TestCursor
203        cur = con.cursor()
204        self.assertIsInstance(cur, TestCursor)
205        cur.execute("select 1 as a, 2 as b")
206        res = cur.fetchone()
207        self.assertIsInstance(res, dict)
208        self.assertEqual(res, {'a': 1, 'b': 2})
209        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
210        res = cur.fetchall()
211        self.assertIsInstance(res, list)
212        self.assertEqual(len(res), 2)
213        self.assertIsInstance(res[0], dict)
214        self.assertEqual(res[0], {'a': 1, 'b': 2})
215        self.assertIsInstance(res[1], dict)
216        self.assertEqual(res[1], {'a': 3, 'b': 4})
217
218    def test_cursor_with_named_columns(self):
219        con = self._connect()
220        cur = con.cursor()
221        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
222        self.assertIs(res, cur, 'execute() should return cursor')
223        res = cur.fetchone()
224        self.assertIsInstance(res, tuple)
225        self.assertEqual(res, (1, 2, 3))
226        self.assertEqual(res._fields, ('abc', 'de', 'f'))
227        self.assertEqual(res.abc, 1)
228        self.assertEqual(res.de, 2)
229        self.assertEqual(res.f, 3)
230        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
231        res = cur.fetchall()
232        self.assertIsInstance(res, list)
233        self.assertEqual(len(res), 2)
234        self.assertIsInstance(res[0], tuple)
235        self.assertEqual(res[0], (1, 2))
236        self.assertEqual(res[0]._fields, ('one', 'two'))
237        self.assertIsInstance(res[1], tuple)
238        self.assertEqual(res[1], (3, 4))
239        self.assertEqual(res[1]._fields, ('one', 'two'))
240
241    def test_cursor_with_unnamed_columns(self):
242        con = self._connect()
243        cur = con.cursor()
244        cur.execute("select 1, 2, 3")
245        res = cur.fetchone()
246        self.assertIsInstance(res, tuple)
247        self.assertEqual(res, (1, 2, 3))
248        old_py = OrderedDict is None  # Python 2.6 or 3.0
249        # old Python versions cannot rename tuple fields with underscore
250        if old_py:
251            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
252        else:
253            self.assertEqual(res._fields, ('_0', '_1', '_2'))
254        cur.execute("select 1 as one, 2, 3 as three")
255        res = cur.fetchone()
256        self.assertIsInstance(res, tuple)
257        self.assertEqual(res, (1, 2, 3))
258        if old_py:  # cannot auto rename with underscore
259            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
260        else:
261            self.assertEqual(res._fields, ('one', '_1', 'three'))
262
263    def test_cursor_with_badly_named_columns(self):
264        con = self._connect()
265        cur = con.cursor()
266        cur.execute("select 1 as abc, 2 as def")
267        res = cur.fetchone()
268        self.assertIsInstance(res, tuple)
269        self.assertEqual(res, (1, 2))
270        old_py = OrderedDict is None  # Python 2.6 or 3.0
271        if old_py:
272            self.assertEqual(res._fields, ('abc', 'column_1'))
273        else:
274            self.assertEqual(res._fields, ('abc', '_1'))
275        cur.execute('select 1 as snake_case, 2 as "CamelCase",'
276            ' 3 as "kebap-case", 4 as "_bad", 5 as "0bad", 6 as "bad$"')
277        res = cur.fetchone()
278        self.assertIsInstance(res, tuple)
279        self.assertEqual(res, (1, 2, 3, 4, 5, 6))
280        # old Python versions cannot rename tuple fields with underscore
281        self.assertEqual(res._fields[:2], ('snake_case', 'CamelCase'))
282        fields = ('_2', '_3', '_4', '_5')
283        if old_py:
284            fields = tuple('column' + field for field in fields)
285        self.assertEqual(res._fields[2:], fields)
286
287    def test_colnames(self):
288        con = self._connect()
289        cur = con.cursor()
290        cur.execute("select 1, 2, 3")
291        names = cur.colnames
292        self.assertIsInstance(names, list)
293        self.assertEqual(names, ['?column?', '?column?', '?column?'])
294        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
295        names = cur.colnames
296        self.assertIsInstance(names, list)
297        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
298
299    def test_coltypes(self):
300        con = self._connect()
301        cur = con.cursor()
302        cur.execute("select 1::int2, 2::int4, 3::int8")
303        types = cur.coltypes
304        self.assertIsInstance(types, list)
305        self.assertEqual(types, ['int2', 'int4', 'int8'])
306
307    def test_description_fields(self):
308        con = self._connect()
309        cur = con.cursor()
310        cur.execute("select 123456789::int8 col0,"
311            " 123456.789::numeric(41, 13) as col1,"
312            " 'foobar'::char(39) as col2")
313        desc = cur.description
314        self.assertIsInstance(desc, list)
315        self.assertEqual(len(desc), 3)
316        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
317        for i in range(3):
318            c, d = cols[i], desc[i]
319            self.assertIsInstance(d, tuple)
320            self.assertEqual(len(d), 7)
321            self.assertIsInstance(d.name, str)
322            self.assertEqual(d.name, 'col%d' % i)
323            self.assertIsInstance(d.type_code, str)
324            self.assertEqual(d.type_code, c[0])
325            self.assertIsNone(d.display_size)
326            self.assertIsInstance(d.internal_size, int)
327            self.assertEqual(d.internal_size, c[1])
328            if c[2] is not None:
329                self.assertIsInstance(d.precision, int)
330                self.assertEqual(d.precision, c[1])
331                self.assertIsInstance(d.scale, int)
332                self.assertEqual(d.scale, c[2])
333            else:
334                self.assertIsNone(d.precision)
335                self.assertIsNone(d.scale)
336            self.assertIsNone(d.null_ok)
337
338    def test_type_cache_info(self):
339        con = self._connect()
340        try:
341            cur = con.cursor()
342            type_cache = con.type_cache
343            self.assertNotIn('numeric', type_cache)
344            type_info = type_cache['numeric']
345            self.assertIn('numeric', type_cache)
346            self.assertEqual(type_info, 'numeric')
347            self.assertEqual(type_info.oid, 1700)
348            self.assertEqual(type_info.len, -1)
349            self.assertEqual(type_info.type, 'b')  # base
350            self.assertEqual(type_info.category, 'N')  # numeric
351            self.assertEqual(type_info.delim, ',')
352            self.assertEqual(type_info.relid, 0)
353            self.assertIs(con.type_cache[1700], type_info)
354            self.assertNotIn('pg_type', type_cache)
355            type_info = type_cache['pg_type']
356            self.assertIn('pg_type', type_cache)
357            self.assertEqual(type_info.type, 'c')  # composite
358            self.assertEqual(type_info.category, 'C')  # composite
359            cols = type_cache.get_fields('pg_type')
360            self.assertEqual(cols[0].name, 'typname')
361            typname = type_cache[cols[0].type]
362            self.assertEqual(typname, 'name')
363            self.assertEqual(typname.type, 'b')  # base
364            self.assertEqual(typname.category, 'S')  # string
365            self.assertEqual(cols[3].name, 'typlen')
366            typlen = type_cache[cols[3].type]
367            self.assertEqual(typlen, 'int2')
368            self.assertEqual(typlen.type, 'b')  # base
369            self.assertEqual(typlen.category, 'N')  # numeric
370            cur.close()
371            cur = con.cursor()
372            type_cache = con.type_cache
373            self.assertIn('numeric', type_cache)
374            cur.close()
375        finally:
376            con.close()
377        con = self._connect()
378        try:
379            cur = con.cursor()
380            type_cache = con.type_cache
381            self.assertNotIn('pg_type', type_cache)
382            self.assertEqual(type_cache.get('pg_type'), type_info)
383            self.assertIn('pg_type', type_cache)
384            self.assertIsNone(type_cache.get(
385                self.table_prefix + '_surely_does_not_exist'))
386            cur.close()
387        finally:
388            con.close()
389
390    def test_type_cache_typecast(self):
391        con = self._connect()
392        try:
393            cur = con.cursor()
394            type_cache = con.type_cache
395            self.assertIs(type_cache.get_typecast('int4'), int)
396            cast_int = lambda v: 'int(%s)' % v
397            type_cache.set_typecast('int4', cast_int)
398            query = 'select 2::int2, 4::int4, 8::int8'
399            cur.execute(query)
400            i2, i4, i8 = cur.fetchone()
401            self.assertEqual(i2, 2)
402            self.assertEqual(i4, 'int(4)')
403            self.assertEqual(i8, 8)
404            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
405            type_cache.set_typecast(['int2', 'int8'], cast_int)
406            cur.execute(query)
407            i2, i4, i8 = cur.fetchone()
408            self.assertEqual(i2, 'int(2)')
409            self.assertEqual(i4, 'int(4)')
410            self.assertEqual(i8, 'int(8)')
411            type_cache.reset_typecast('int4')
412            cur.execute(query)
413            i2, i4, i8 = cur.fetchone()
414            self.assertEqual(i2, 'int(2)')
415            self.assertEqual(i4, 4)
416            self.assertEqual(i8, 'int(8)')
417            type_cache.reset_typecast(['int2', 'int8'])
418            cur.execute(query)
419            i2, i4, i8 = cur.fetchone()
420            self.assertEqual(i2, 2)
421            self.assertEqual(i4, 4)
422            self.assertEqual(i8, 8)
423            type_cache.set_typecast(['int2', 'int8'], cast_int)
424            cur.execute(query)
425            i2, i4, i8 = cur.fetchone()
426            self.assertEqual(i2, 'int(2)')
427            self.assertEqual(i4, 4)
428            self.assertEqual(i8, 'int(8)')
429            type_cache.reset_typecast()
430            cur.execute(query)
431            i2, i4, i8 = cur.fetchone()
432            self.assertEqual(i2, 2)
433            self.assertEqual(i4, 4)
434            self.assertEqual(i8, 8)
435            cur.close()
436        finally:
437            con.close()
438
439    def test_cursor_iteration(self):
440        con = self._connect()
441        cur = con.cursor()
442        cur.execute("select 1 union select 2 union select 3 order by 1")
443        self.assertEqual([r[0] for r in cur], [1, 2, 3])
444
445    def test_cursor_invalidation(self):
446        con = self._connect()
447        cur = con.cursor()
448        cur.execute("select 1 union select 2")
449        self.assertEqual(cur.fetchone(), (1,))
450        self.assertFalse(con.closed)
451        con.close()
452        self.assertTrue(con.closed)
453        self.assertRaises(pgdb.OperationalError, cur.fetchone)
454
455    def test_fetch_2_rows(self):
456        Decimal = pgdb.decimal_type()
457        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
458            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
459            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
460            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
461            pgdb.Interval(15, 31, 5), 7897234)
462        table = self.table_prefix + 'booze'
463        con = self._connect()
464        try:
465            cur = con.cursor()
466            cur.execute("set datestyle to iso")
467            cur.execute("create table %s ("
468                "stringtest varchar,"
469                "binarytest bytea,"
470                "booltest bool,"
471                "integertest int4,"
472                "longtest int8,"
473                "floattest float8,"
474                "numerictest numeric,"
475                "moneytest money,"
476                "datetest date,"
477                "timetest time,"
478                "datetimetest timestamp,"
479                "intervaltest interval,"
480                "rowidtest oid)" % table)
481            cur.execute("set standard_conforming_strings to on")
482            for s in ('numeric', 'monetary', 'time'):
483                cur.execute("set lc_%s to 'C'" % s)
484            for _i in range(2):
485                cur.execute("insert into %s values ("
486                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
487                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
488            cur.execute("select * from %s" % table)
489            rows = cur.fetchall()
490            self.assertEqual(len(rows), 2)
491            row0 = rows[0]
492            self.assertEqual(row0, values)
493            self.assertEqual(row0, rows[1])
494            self.assertIsInstance(row0[0], str)
495            self.assertIsInstance(row0[1], bytes)
496            self.assertIsInstance(row0[2], bool)
497            self.assertIsInstance(row0[3], int)
498            self.assertIsInstance(row0[4], long)
499            self.assertIsInstance(row0[5], float)
500            self.assertIsInstance(row0[6], Decimal)
501            self.assertIsInstance(row0[7], Decimal)
502            self.assertIsInstance(row0[8], date)
503            self.assertIsInstance(row0[9], time)
504            self.assertIsInstance(row0[10], datetime)
505            self.assertIsInstance(row0[11], timedelta)
506        finally:
507            con.close()
508
509    def test_integrity_error(self):
510        table = self.table_prefix + 'booze'
511        con = self._connect()
512        try:
513            cur = con.cursor()
514            cur.execute("set client_min_messages = warning")
515            cur.execute("create table %s (i int primary key)" % table)
516            cur.execute("insert into %s values (1)" % table)
517            cur.execute("insert into %s values (2)" % table)
518            self.assertRaises(pgdb.IntegrityError, cur.execute,
519                "insert into %s values (1)" % table)
520        finally:
521            con.close()
522
523    def test_update_rowcount(self):
524        table = self.table_prefix + 'booze'
525        con = self._connect()
526        try:
527            cur = con.cursor()
528            cur.execute("create table %s (i int)" % table)
529            cur.execute("insert into %s values (1)" % table)
530            cur.execute("update %s set i=2 where i=2 returning i" % table)
531            self.assertEqual(cur.rowcount, 0)
532            cur.execute("update %s set i=2 where i=1 returning i" % table)
533            self.assertEqual(cur.rowcount, 1)
534            cur.close()
535            # keep rowcount even if cursor is closed (needed by SQLAlchemy)
536            self.assertEqual(cur.rowcount, 1)
537        finally:
538            con.close()
539
540    def test_sqlstate(self):
541        con = self._connect()
542        cur = con.cursor()
543        try:
544            cur.execute("select 1/0")
545        except pgdb.DatabaseError as error:
546            self.assertTrue(isinstance(error, pgdb.DataError))
547            # the SQLSTATE error code for division by zero is 22012
548            self.assertEqual(error.sqlstate, '22012')
549
550    def test_float(self):
551        nan, inf = float('nan'), float('inf')
552        from math import isnan, isinf
553        self.assertTrue(isnan(nan) and not isinf(nan))
554        self.assertTrue(isinf(inf) and not isnan(inf))
555        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
556            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
557        table = self.table_prefix + 'booze'
558        con = self._connect()
559        try:
560            cur = con.cursor()
561            cur.execute(
562                "create table %s (n smallint, floattest float)" % table)
563            params = enumerate(values)
564            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
565            cur.execute("select floattest from %s order by n" % table)
566            rows = cur.fetchall()
567            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
568            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
569            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
570        finally:
571            con.close()
572        self.assertEqual(len(rows), len(values))
573        rows = [row[0] for row in rows]
574        for inval, outval in zip(values, rows):
575            if inval in ('inf', 'Infinity'):
576                inval = inf
577            elif inval in ('-inf', '-Infinity'):
578                inval = -inf
579            elif inval in ('nan', 'NaN'):
580                inval = nan
581            if isinf(inval):
582                self.assertTrue(isinf(outval))
583                if inval < 0:
584                    self.assertTrue(outval < 0)
585                else:
586                    self.assertTrue(outval > 0)
587            elif isnan(inval):
588                self.assertTrue(isnan(outval))
589            else:
590                self.assertEqual(inval, outval)
591
592    def test_datetime(self):
593        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
594        table = self.table_prefix + 'booze'
595        con = self._connect()
596        try:
597            cur = con.cursor()
598            cur.execute("set timezone = UTC")
599            cur.execute("create table %s ("
600                "d date, t time,  ts timestamp,"
601                "tz timetz, tsz timestamptz)" % table)
602            for n in range(3):
603                values = [dt.date(), dt.time(), dt,
604                    dt.time(), dt]
605                values[3] = values[3].replace(tzinfo=pgdb.timezone.utc)
606                values[4] = values[4].replace(tzinfo=pgdb.timezone.utc)
607                if n == 0:  # input as objects
608                    params = values
609                if n == 1:  # input as text
610                    params = [v.isoformat() for v in values]  # as text
611                elif n == 2:  # input using type helpers
612                    d = (dt.year, dt.month, dt.day)
613                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
614                    z = (pgdb.timezone.utc,)
615                    params = [pgdb.Date(*d), pgdb.Time(*t),
616                            pgdb.Timestamp(*(d + t)), pgdb.Time(*(t + z)),
617                            pgdb.Timestamp(*(d + t + z))]
618                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
619                        'sql, mdy', 'sql, dmy', 'german'):
620                    cur.execute("set datestyle to %s" % datestyle)
621                    if n != 1:
622                        cur.execute("select %s,%s,%s,%s,%s", params)
623                        row = cur.fetchone()
624                        self.assertEqual(row, tuple(values))
625                    cur.execute("insert into %s"
626                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
627                    cur.execute("select * from %s" % table)
628                    d = cur.description
629                    for i in range(5):
630                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
631                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
632                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
633                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
634                    self.assertEqual(d[0].type_code, pgdb.DATE)
635                    self.assertEqual(d[1].type_code, pgdb.TIME)
636                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
637                    self.assertEqual(d[3].type_code, pgdb.TIME)
638                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
639                    row = cur.fetchone()
640                    self.assertEqual(row, tuple(values))
641                    cur.execute("delete from %s" % table)
642        finally:
643            con.close()
644
645    def test_interval(self):
646        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
647        table = self.table_prefix + 'booze'
648        con = self._connect()
649        try:
650            cur = con.cursor()
651            cur.execute("create table %s (i interval)" % table)
652            for n in range(3):
653                if n == 0:  # input as objects
654                    param = td
655                if n == 1:  # input as text
656                    param = '%d days %d seconds %d microseconds ' % (
657                        td.days, td.seconds, td.microseconds)
658                elif n == 2:  # input using type helpers
659                    param = pgdb.Interval(
660                        td.days, 0, 0, td.seconds, td.microseconds)
661                for intervalstyle in ('sql_standard ', 'postgres',
662                        'postgres_verbose', 'iso_8601'):
663                    cur.execute("set intervalstyle to %s" % intervalstyle)
664                    cur.execute("insert into %s"
665                        " values (%%s)" % table, [param])
666                    cur.execute("select * from %s" % table)
667                    tc = cur.description[0].type_code
668                    self.assertEqual(tc, pgdb.DATETIME)
669                    self.assertNotEqual(tc, pgdb.STRING)
670                    self.assertNotEqual(tc, pgdb.ARRAY)
671                    self.assertNotEqual(tc, pgdb.RECORD)
672                    self.assertEqual(tc, pgdb.INTERVAL)
673                    row = cur.fetchone()
674                    self.assertEqual(row, (td,))
675                    cur.execute("delete from %s" % table)
676        finally:
677            con.close()
678
679    def test_hstore(self):
680        con = self._connect()
681        try:
682            cur = con.cursor()
683            cur.execute("select 'k=>v'::hstore")
684        except pgdb.DatabaseError:
685            try:
686                cur.execute("create extension hstore")
687            except pgdb.DatabaseError:
688                self.skipTest("hstore extension not enabled")
689        finally:
690            con.close()
691        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever', 'back\\': '\\slash',
692            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
693            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
694            'None': None, 'NULL': 'NULL', 'empty': ''}
695        con = self._connect()
696        try:
697            cur = con.cursor()
698            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
699            result = cur.fetchone()[0]
700        finally:
701            con.close()
702        self.assertIsInstance(result, dict)
703        self.assertEqual(result, d)
704
705    def test_uuid(self):
706        self.assertIs(Uuid, pgdb.Uuid)
707        d = Uuid('{12345678-1234-5678-1234-567812345678}')
708        con = self._connect()
709        try:
710            cur = con.cursor()
711            cur.execute("select %s::uuid", (d,))
712            result = cur.fetchone()[0]
713        finally:
714            con.close()
715        self.assertIsInstance(result, Uuid)
716        self.assertEqual(result, d)
717
718    def test_insert_array(self):
719        values = [(None, None), ([], []), ([None], [[None], ['null']]),
720            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
721            ([20000, 25000, 25000, 30000],
722            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
723            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
724        table = self.table_prefix + 'booze'
725        con = self._connect()
726        try:
727            cur = con.cursor()
728            cur.execute("create table %s"
729                " (n smallint, i int[], t text[][])" % table)
730            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
731            # Note that we must explicit casts because we are inserting
732            # empty arrays.  Otherwise this is not necessary.
733            cur.executemany("insert into %s values"
734                " (%%d,%%s::int[],%%s::text[][])" % table, params)
735            cur.execute("select i, t from %s order by n" % table)
736            d = cur.description
737            self.assertEqual(d[0].type_code, pgdb.ARRAY)
738            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
739            self.assertEqual(d[0].type_code, pgdb.NUMBER)
740            self.assertEqual(d[0].type_code, pgdb.INTEGER)
741            self.assertEqual(d[1].type_code, pgdb.ARRAY)
742            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
743            self.assertEqual(d[1].type_code, pgdb.STRING)
744            rows = cur.fetchall()
745        finally:
746            con.close()
747        self.assertEqual(rows, values)
748
749    def test_select_array(self):
750        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
751        con = self._connect()
752        try:
753            cur = con.cursor()
754            cur.execute("select %s::int[], %s::text[]", values)
755            row = cur.fetchone()
756        finally:
757            con.close()
758        self.assertEqual(row, values)
759
760    def test_unicode_list_and_tuple(self):
761        value = (u'KÀse', u'WÃŒrstchen')
762        con = self._connect()
763        try:
764            cur = con.cursor()
765            try:
766                cur.execute("select %s, %s", value)
767            except pgdb.DatabaseError:
768                self.skipTest('database does not support latin-1')
769            row = cur.fetchone()
770            cur.execute("select %s, %s", (list(value), tuple(value)))
771            as_list, as_tuple = cur.fetchone()
772        finally:
773            con.close()
774        self.assertEqual(as_list, list(row))
775        self.assertEqual(as_tuple, tuple(row))
776
777    def test_insert_record(self):
778        values = [('John', 61), ('Jane', 63),
779                  ('Fred', None), ('Wilma', None),
780                  (None, 42), (None, None)]
781        table = self.table_prefix + 'booze'
782        record = self.table_prefix + 'munch'
783        con = self._connect()
784        try:
785            cur = con.cursor()
786            cur.execute("create type %s as (name varchar, age int)" % record)
787            cur.execute("create table %s (n smallint, r %s)" % (table, record))
788            params = enumerate(values)
789            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
790            cur.execute("select r from %s order by n" % table)
791            type_code = cur.description[0].type_code
792            self.assertEqual(type_code, record)
793            self.assertEqual(type_code, pgdb.RECORD)
794            self.assertNotEqual(type_code, pgdb.ARRAY)
795            columns = con.type_cache.get_fields(type_code)
796            self.assertEqual(columns[0].name, 'name')
797            self.assertEqual(columns[1].name, 'age')
798            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
799            self.assertEqual(con.type_cache[columns[1].type], 'int4')
800            rows = cur.fetchall()
801        finally:
802            cur.execute('drop table %s' % table)
803            cur.execute('drop type %s' % record)
804            con.close()
805        self.assertEqual(len(rows), len(values))
806        rows = [row[0] for row in rows]
807        self.assertEqual(rows, values)
808        self.assertEqual(rows[0].name, 'John')
809        self.assertEqual(rows[0].age, 61)
810
811    def test_select_record(self):
812        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
813            '(test)', '(x,y)', ' x y ', 'null', None)
814        con = self._connect()
815        try:
816            cur = con.cursor()
817            cur.execute("select %s as test_record", [value])
818            self.assertEqual(cur.description[0].name, 'test_record')
819            self.assertEqual(cur.description[0].type_code, 'record')
820            row = cur.fetchone()[0]
821        finally:
822            con.close()
823        # Note that the element types get lost since we created an
824        # untyped record (an anonymous composite type). For the same
825        # reason this is also a normal tuple, not a named tuple.
826        text_row = tuple(None if v is None else str(v) for v in value)
827        self.assertEqual(row, text_row)
828
829    def test_custom_type(self):
830        values = [3, 5, 65]
831        values = list(map(PgBitString, values))
832        table = self.table_prefix + 'booze'
833        con = self._connect()
834        try:
835            cur = con.cursor()
836            params = enumerate(values)  # params have __pg_repr__ method
837            cur.execute(
838                'create table "%s" (n smallint, b bit varying(7))' % table)
839            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
840            cur.execute("select * from %s" % table)
841            rows = cur.fetchall()
842        finally:
843            con.close()
844        self.assertEqual(len(rows), len(values))
845        con = self._connect()
846        try:
847            cur = con.cursor()
848            params = (1, object())  # an object that cannot be handled
849            self.assertRaises(pgdb.InterfaceError, cur.execute,
850                "insert into %s values (%%s,%%s)" % table, params)
851        finally:
852            con.close()
853
854    def test_set_decimal_type(self):
855        decimal_type = pgdb.decimal_type()
856        self.assertTrue(decimal_type is not None and callable(decimal_type))
857        con = self._connect()
858        try:
859            cur = con.cursor()
860            # change decimal type globally to int
861            int_type = lambda v: int(float(v))
862            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
863            cur.execute('select 4.25')
864            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
865            value = cur.fetchone()[0]
866            self.assertTrue(isinstance(value, int))
867            self.assertEqual(value, 4)
868            # change decimal type again to float
869            self.assertTrue(pgdb.decimal_type(float) is float)
870            cur.execute('select 4.25')
871            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
872            value = cur.fetchone()[0]
873            # the connection still uses the old setting
874            self.assertTrue(isinstance(value, int))
875            # bust the cache for type functions for the connection
876            con.type_cache.reset_typecast()
877            cur.execute('select 4.25')
878            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
879            value = cur.fetchone()[0]
880            # now the connection uses the new setting
881            self.assertTrue(isinstance(value, float))
882            self.assertEqual(value, 4.25)
883        finally:
884            con.close()
885            pgdb.decimal_type(decimal_type)
886        self.assertTrue(pgdb.decimal_type() is decimal_type)
887
888    def test_global_typecast(self):
889        try:
890            query = 'select 2::int2, 4::int4, 8::int8'
891            self.assertIs(pgdb.get_typecast('int4'), int)
892            cast_int = lambda v: 'int(%s)' % v
893            pgdb.set_typecast('int4', cast_int)
894            con = self._connect()
895            try:
896                i2, i4, i8 = con.cursor().execute(query).fetchone()
897            finally:
898                con.close()
899            self.assertEqual(i2, 2)
900            self.assertEqual(i4, 'int(4)')
901            self.assertEqual(i8, 8)
902            pgdb.set_typecast(['int2', 'int8'], cast_int)
903            con = self._connect()
904            try:
905                i2, i4, i8 = con.cursor().execute(query).fetchone()
906            finally:
907                con.close()
908            self.assertEqual(i2, 'int(2)')
909            self.assertEqual(i4, 'int(4)')
910            self.assertEqual(i8, 'int(8)')
911            pgdb.reset_typecast('int4')
912            con = self._connect()
913            try:
914                i2, i4, i8 = con.cursor().execute(query).fetchone()
915            finally:
916                con.close()
917            self.assertEqual(i2, 'int(2)')
918            self.assertEqual(i4, 4)
919            self.assertEqual(i8, 'int(8)')
920            pgdb.reset_typecast(['int2', 'int8'])
921            con = self._connect()
922            try:
923                i2, i4, i8 = con.cursor().execute(query).fetchone()
924            finally:
925                con.close()
926            self.assertEqual(i2, 2)
927            self.assertEqual(i4, 4)
928            self.assertEqual(i8, 8)
929            pgdb.set_typecast(['int2', 'int8'], cast_int)
930            con = self._connect()
931            try:
932                i2, i4, i8 = con.cursor().execute(query).fetchone()
933            finally:
934                con.close()
935            self.assertEqual(i2, 'int(2)')
936            self.assertEqual(i4, 4)
937            self.assertEqual(i8, 'int(8)')
938        finally:
939            pgdb.reset_typecast()
940        con = self._connect()
941        try:
942            i2, i4, i8 = con.cursor().execute(query).fetchone()
943        finally:
944            con.close()
945        self.assertEqual(i2, 2)
946        self.assertEqual(i4, 4)
947        self.assertEqual(i8, 8)
948
949    def test_set_typecast_for_arrays(self):
950        query = 'select ARRAY[1,2,3]'
951        try:
952            con = self._connect()
953            try:
954                r = con.cursor().execute(query).fetchone()[0]
955            finally:
956                con.close()
957            self.assertIsInstance(r, list)
958            self.assertEqual(r, [1, 2, 3])
959            pgdb.set_typecast('anyarray', lambda v, basecast: v)
960            con = self._connect()
961            try:
962                r = con.cursor().execute(query).fetchone()[0]
963            finally:
964                con.close()
965            self.assertIsInstance(r, str)
966            self.assertEqual(r, '{1,2,3}')
967        finally:
968            pgdb.reset_typecast()
969        con = self._connect()
970        try:
971            r = con.cursor().execute(query).fetchone()[0]
972        finally:
973            con.close()
974        self.assertIsInstance(r, list)
975        self.assertEqual(r, [1, 2, 3])
976
977    def test_unicode_with_utf8(self):
978        table = self.table_prefix + 'booze'
979        input = u"He wes Leovenaðes sone — liðe him be Drihten"
980        con = self._connect()
981        try:
982            cur = con.cursor()
983            cur.execute("create table %s (t text)" % table)
984            try:
985                cur.execute("set client_encoding=utf8")
986                cur.execute(u"select '%s'" % input)
987            except Exception:
988                self.skipTest("database does not support utf8")
989            output1 = cur.fetchone()[0]
990            cur.execute("insert into %s values (%%s)" % table, (input,))
991            cur.execute("select * from %s" % table)
992            output2 = cur.fetchone()[0]
993            cur.execute("select t = '%s' from %s" % (input, table))
994            output3 = cur.fetchone()[0]
995            cur.execute("select t = %%s from %s" % table, (input,))
996            output4 = cur.fetchone()[0]
997        finally:
998            con.close()
999        if str is bytes:  # Python < 3.0
1000            input = input.encode('utf8')
1001        self.assertIsInstance(output1, str)
1002        self.assertEqual(output1, input)
1003        self.assertIsInstance(output2, str)
1004        self.assertEqual(output2, input)
1005        self.assertIsInstance(output3, bool)
1006        self.assertTrue(output3)
1007        self.assertIsInstance(output4, bool)
1008        self.assertTrue(output4)
1009
1010    def test_unicode_with_latin1(self):
1011        table = self.table_prefix + 'booze'
1012        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
1013        con = self._connect()
1014        try:
1015            cur = con.cursor()
1016            cur.execute("create table %s (t text)" % table)
1017            try:
1018                cur.execute("set client_encoding=latin1")
1019                cur.execute(u"select '%s'" % input)
1020            except Exception:
1021                self.skipTest("database does not support latin1")
1022            output1 = cur.fetchone()[0]
1023            cur.execute("insert into %s values (%%s)" % table, (input,))
1024            cur.execute("select * from %s" % table)
1025            output2 = cur.fetchone()[0]
1026            cur.execute("select t = '%s' from %s" % (input, table))
1027            output3 = cur.fetchone()[0]
1028            cur.execute("select t = %%s from %s" % table, (input,))
1029            output4 = cur.fetchone()[0]
1030        finally:
1031            con.close()
1032        if str is bytes:  # Python < 3.0
1033            input = input.encode('latin1')
1034        self.assertIsInstance(output1, str)
1035        self.assertEqual(output1, input)
1036        self.assertIsInstance(output2, str)
1037        self.assertEqual(output2, input)
1038        self.assertIsInstance(output3, bool)
1039        self.assertTrue(output3)
1040        self.assertIsInstance(output4, bool)
1041        self.assertTrue(output4)
1042
1043    def test_bool(self):
1044        values = [False, True, None, 't', 'f', 'true', 'false']
1045        table = self.table_prefix + 'booze'
1046        con = self._connect()
1047        try:
1048            cur = con.cursor()
1049            cur.execute(
1050                "create table %s (n smallint, booltest bool)" % table)
1051            params = enumerate(values)
1052            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
1053            cur.execute("select booltest from %s order by n" % table)
1054            rows = cur.fetchall()
1055            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
1056        finally:
1057            con.close()
1058        rows = [row[0] for row in rows]
1059        values[3] = values[5] = True
1060        values[4] = values[6] = False
1061        self.assertEqual(rows, values)
1062
1063    def test_literal(self):
1064        con = self._connect()
1065        try:
1066            cur = con.cursor()
1067            value = "lower('Hello')"
1068            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
1069            row = cur.fetchone()
1070        finally:
1071            con.close()
1072        self.assertEqual(row, (value, 'hello'))
1073
1074    def test_json(self):
1075        inval = {"employees":
1076            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1077        table = self.table_prefix + 'booze'
1078        con = self._connect()
1079        try:
1080            cur = con.cursor()
1081            try:
1082                cur.execute("create table %s (jsontest json)" % table)
1083            except pgdb.ProgrammingError:
1084                self.skipTest('database does not support json')
1085            params = (pgdb.Json(inval),)
1086            cur.execute("insert into %s values (%%s)" % table, params)
1087            cur.execute("select jsontest from %s" % table)
1088            outval = cur.fetchone()[0]
1089            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1090        finally:
1091            con.close()
1092        self.assertEqual(inval, outval)
1093
1094    def test_jsonb(self):
1095        inval = {"employees":
1096            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1097        table = self.table_prefix + 'booze'
1098        con = self._connect()
1099        try:
1100            cur = con.cursor()
1101            try:
1102                cur.execute("create table %s (jsonbtest jsonb)" % table)
1103            except pgdb.ProgrammingError:
1104                self.skipTest('database does not support jsonb')
1105            params = (pgdb.Json(inval),)
1106            cur.execute("insert into %s values (%%s)" % table, params)
1107            cur.execute("select jsonbtest from %s" % table)
1108            outval = cur.fetchone()[0]
1109            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1110        finally:
1111            con.close()
1112        self.assertEqual(inval, outval)
1113
1114    def test_execute_edge_cases(self):
1115        con = self._connect()
1116        try:
1117            cur = con.cursor()
1118            sql = 'invalid'  # should be ignored with empty parameter list
1119            cur.executemany(sql, [])
1120            sql = 'select %d + 1'
1121            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
1122            self.assertEqual(cur.fetchone()[0], 3)
1123            sql = 'select 1/0'  # cannot be executed
1124            self.assertRaises(pgdb.DataError, cur.execute, sql)
1125            cur.close()
1126            con.rollback()
1127            if pgdb.shortcutmethods:
1128                res = con.execute('select %d', (1,)).fetchone()
1129                self.assertEqual(res, (1,))
1130                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1131                self.assertEqual(res, (2,))
1132        finally:
1133            con.close()
1134        sql = 'select 1'  # cannot be executed after connection is closed
1135        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1136
1137    def test_fetchmany_with_keep(self):
1138        con = self._connect()
1139        try:
1140            cur = con.cursor()
1141            self.assertEqual(cur.arraysize, 1)
1142            cur.execute('select * from generate_series(1, 25)')
1143            self.assertEqual(len(cur.fetchmany()), 1)
1144            self.assertEqual(len(cur.fetchmany()), 1)
1145            self.assertEqual(cur.arraysize, 1)
1146            cur.arraysize = 3
1147            self.assertEqual(len(cur.fetchmany()), 3)
1148            self.assertEqual(len(cur.fetchmany()), 3)
1149            self.assertEqual(cur.arraysize, 3)
1150            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1151            self.assertEqual(cur.arraysize, 3)
1152            self.assertEqual(len(cur.fetchmany()), 3)
1153            self.assertEqual(len(cur.fetchmany()), 3)
1154            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1155            self.assertEqual(cur.arraysize, 2)
1156            self.assertEqual(len(cur.fetchmany()), 2)
1157            self.assertEqual(len(cur.fetchmany()), 2)
1158            self.assertEqual(len(cur.fetchmany(25)), 3)
1159        finally:
1160            con.close()
1161
1162    def test_nextset(self):
1163        con = self._connect()
1164        cur = con.cursor()
1165        self.assertRaises(con.NotSupportedError, cur.nextset)
1166
1167    def test_setoutputsize(self):
1168        pass  # not supported
1169
1170    def test_connection_errors(self):
1171        con = self._connect()
1172        self.assertEqual(con.Error, pgdb.Error)
1173        self.assertEqual(con.Warning, pgdb.Warning)
1174        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1175        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1176        self.assertEqual(con.InternalError, pgdb.InternalError)
1177        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1178        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1179        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1180        self.assertEqual(con.DataError, pgdb.DataError)
1181        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1182
1183    def test_transaction(self):
1184        table = self.table_prefix + 'booze'
1185        con1 = self._connect()
1186        cur1 = con1.cursor()
1187        self.executeDDL1(cur1)
1188        con1.commit()
1189        con2 = self._connect()
1190        cur2 = con2.cursor()
1191        cur2.execute("select name from %s" % table)
1192        self.assertIsNone(cur2.fetchone())
1193        cur1.execute("insert into %s values('Schlafly')" % table)
1194        cur2.execute("select name from %s" % table)
1195        self.assertIsNone(cur2.fetchone())
1196        con1.commit()
1197        cur2.execute("select name from %s" % table)
1198        self.assertEqual(cur2.fetchone(), ('Schlafly',))
1199        con2.close()
1200        con1.close()
1201
1202    def test_autocommit(self):
1203        table = self.table_prefix + 'booze'
1204        con1 = self._connect()
1205        con1.autocommit = True
1206        cur1 = con1.cursor()
1207        self.executeDDL1(cur1)
1208        con2 = self._connect()
1209        cur2 = con2.cursor()
1210        cur2.execute("select name from %s" % table)
1211        self.assertIsNone(cur2.fetchone())
1212        cur1.execute("insert into %s values('Shmaltz Pastrami')" % table)
1213        cur2.execute("select name from %s" % table)
1214        self.assertEqual(cur2.fetchone(), ('Shmaltz Pastrami',))
1215        con2.close()
1216        con1.close()
1217
1218    def test_connection_as_contextmanager(self):
1219        table = self.table_prefix + 'booze'
1220        for autocommit in False, True:
1221            con = self._connect()
1222            con.autocommit = autocommit
1223            try:
1224                cur = con.cursor()
1225                if autocommit:
1226                    cur.execute("truncate %s" % table)
1227                else:
1228                    cur.execute(
1229                        "create table %s (n smallint check(n!=4))" % table)
1230                with con:
1231                    cur.execute("insert into %s values (1)" % table)
1232                    cur.execute("insert into %s values (2)" % table)
1233                try:
1234                    with con:
1235                        cur.execute("insert into %s values (3)" % table)
1236                        cur.execute("insert into %s values (4)" % table)
1237                except con.IntegrityError as error:
1238                    self.assertTrue('check' in str(error).lower())
1239                with con:
1240                    cur.execute("insert into %s values (5)" % table)
1241                    cur.execute("insert into %s values (6)" % table)
1242                try:
1243                    with con:
1244                        cur.execute("insert into %s values (7)" % table)
1245                        cur.execute("insert into %s values (8)" % table)
1246                        raise ValueError('transaction should rollback')
1247                except ValueError as error:
1248                    self.assertEqual(str(error), 'transaction should rollback')
1249                with con:
1250                    cur.execute("insert into %s values (9)" % table)
1251                cur.execute("select * from %s order by 1" % table)
1252                rows = cur.fetchall()
1253                rows = [row[0] for row in rows]
1254            finally:
1255                con.close()
1256            self.assertEqual(rows, [1, 2, 5, 6, 9])
1257
1258    def test_cursor_connection(self):
1259        con = self._connect()
1260        cur = con.cursor()
1261        self.assertEqual(cur.connection, con)
1262        cur.close()
1263
1264    def test_cursor_as_contextmanager(self):
1265        con = self._connect()
1266        with con.cursor() as cur:
1267            self.assertEqual(cur.connection, con)
1268
1269    def test_pgdb_type(self):
1270        self.assertEqual(pgdb.STRING, pgdb.STRING)
1271        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1272        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1273        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1274        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1275        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1276        self.assertEqual('char', pgdb.STRING)
1277        self.assertEqual('varchar', pgdb.STRING)
1278        self.assertEqual('text', pgdb.STRING)
1279        self.assertNotEqual('numeric', pgdb.STRING)
1280        self.assertEqual('numeric', pgdb.NUMERIC)
1281        self.assertEqual('numeric', pgdb.NUMBER)
1282        self.assertEqual('int4', pgdb.NUMBER)
1283        self.assertNotEqual('int4', pgdb.NUMERIC)
1284        self.assertEqual('int2', pgdb.SMALLINT)
1285        self.assertNotEqual('int4', pgdb.SMALLINT)
1286        self.assertEqual('int2', pgdb.INTEGER)
1287        self.assertEqual('int4', pgdb.INTEGER)
1288        self.assertEqual('int8', pgdb.INTEGER)
1289        self.assertNotEqual('int4', pgdb.LONG)
1290        self.assertEqual('int8', pgdb.LONG)
1291        self.assertTrue('char' in pgdb.STRING)
1292        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1293        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1294        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1295        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1296        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1297        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1298        self.assertEqual('_char', pgdb.ARRAY)
1299        self.assertNotEqual('char', pgdb.ARRAY)
1300        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1301        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1302        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1303        self.assertEqual('record', pgdb.RECORD)
1304        self.assertNotEqual('_record', pgdb.RECORD)
1305
1306    def test_no_close(self):
1307        data = ('hello', 'world')
1308        con = self._connect()
1309        cur = con.cursor()
1310        cur.build_row_factory = lambda: tuple
1311        cur.execute("select %s, %s", data)
1312        row = cur.fetchone()
1313        self.assertEqual(row, data)
1314
1315    def test_set_row_factory_size(self):
1316        try:
1317            from functools import lru_cache
1318        except ImportError:  # Python < 3.2
1319            lru_cache = None
1320        queries = ['select 1 as a, 2 as b, 3 as c', 'select 123 as abc']
1321        con = self._connect()
1322        cur = con.cursor()
1323        for maxsize in (None, 0, 1, 2, 3, 10, 1024):
1324            pgdb.set_row_factory_size(maxsize)
1325            for i in range(3):
1326                for q in queries:
1327                    cur.execute(q)
1328                    r = cur.fetchone()
1329                    if q.endswith('abc'):
1330                        self.assertEqual(r, (123,))
1331                        self.assertEqual(r._fields, ('abc',))
1332                    else:
1333                        self.assertEqual(r, (1, 2, 3))
1334                        self.assertEqual(r._fields, ('a', 'b', 'c'))
1335            if lru_cache:
1336                info = pgdb._row_factory.cache_info()
1337                self.assertEqual(info.maxsize, maxsize)
1338                self.assertEqual(info.hits + info.misses, 6)
1339                self.assertEqual(info.hits,
1340                    0 if maxsize is not None and maxsize < 2 else 4)
1341
1342    def test_memory_leaks(self):
1343        ids = set()
1344        objs = []
1345        add_ids = ids.update
1346        gc.collect()
1347        objs[:] = gc.get_objects()
1348        add_ids(id(obj) for obj in objs)
1349        self.test_no_close()
1350        gc.collect()
1351        objs[:] = gc.get_objects()
1352        objs[:] = [obj for obj in objs if id(obj) not in ids]
1353        if objs and sys.version_info[:3] in ((3, 5, 0), (3, 5, 1)):
1354            # workaround for Python issue 26811
1355            objs[:] = [obj for obj in objs if repr(obj) != '(<NULL>,)']
1356        self.assertEqual(len(objs), 0)
1357
1358
1359if __name__ == '__main__':
1360    unittest.main()
Note: See TracBrowser for help on using the repository browser.