source: branches/5.0.x/tests/test_dbapi20.py @ 929

Last change on this file since 929 was 929, checked in by cito, 18 months ago

Fix sort order in test to make it reliable

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 51.6 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 929 2018-01-22 09:15:34Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31import gc
32import sys
33
34from datetime import date, time, datetime, timedelta
35from uuid import UUID as Uuid
36
37try:
38    long
39except NameError:  # Python >= 3.0
40    long = int
41
42try:
43    from collections import OrderedDict
44except ImportError:  # Python 2.6 or 3.0
45    OrderedDict = None
46
47
48class PgBitString:
49    """Test object with a PostgreSQL representation as Bit String."""
50
51    def __init__(self, value):
52        self.value = value
53
54    def __pg_repr__(self):
55         return "B'{0:b}'".format(self.value)
56
57
58class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
59
60    driver = pgdb
61    connect_args = ()
62    connect_kw_args = {'database': dbname,
63        'host': '%s:%d' % (dbhost or '', dbport or -1)}
64
65    lower_func = 'lower'  # For stored procedure test
66
67    def setUp(self):
68        # Call superclass setUp in case this does something in the future
69        dbapi20.DatabaseAPI20Test.setUp(self)
70        try:
71            con = self._connect()
72            con.close()
73        except pgdb.Error:  # try to create a missing database
74            import pg
75            try:  # first try to log in as superuser
76                db = pg.DB('postgres', dbhost or None, dbport or -1,
77                    user='postgres')
78            except Exception:  # then try to log in as current user
79                db = pg.DB('postgres', dbhost or None, dbport or -1)
80            db.query('create database ' + dbname)
81
82    def tearDown(self):
83        dbapi20.DatabaseAPI20Test.tearDown(self)
84
85    def test_version(self):
86        v = pgdb.version
87        self.assertIsInstance(v, str)
88        self.assertIn('.', v)
89        self.assertEqual(pgdb.__version__, v)
90
91    def test_connect_kwargs(self):
92        application_name = 'PyGreSQL DB API 2.0 Test'
93        self.connect_kw_args['application_name'] = application_name
94        con = self._connect()
95        cur = con.cursor()
96        cur.execute("select application_name from pg_stat_activity"
97            " where application_name = %s", (application_name,))
98        self.assertEqual(cur.fetchone(), (application_name,))
99
100    def test_percent_sign(self):
101        con = self._connect()
102        cur = con.cursor()
103        cur.execute("select %s, 'a %% sign'", ('a % sign',))
104        self.assertEqual(cur.fetchone(), ('a % sign', 'a % sign'))
105        cur.execute("select 'a % sign'")
106        self.assertEqual(cur.fetchone(), ('a % sign',))
107        cur.execute("select 'a %% sign'")
108        self.assertEqual(cur.fetchone(), ('a % sign',))
109
110    def test_callproc_no_params(self):
111        con = self._connect()
112        cur = con.cursor()
113        # note that now() does not change within a transaction
114        cur.execute('select now()')
115        now = cur.fetchone()[0]
116        res = cur.callproc('now')
117        self.assertIsNone(res)
118        res = cur.fetchone()[0]
119        self.assertEqual(res, now)
120
121    def test_callproc_bad_params(self):
122        con = self._connect()
123        cur = con.cursor()
124        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
125        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
126
127    def test_callproc_one_param(self):
128        con = self._connect()
129        cur = con.cursor()
130        params = (42.4382,)
131        res = cur.callproc("round", params)
132        self.assertIs(res, params)
133        res = cur.fetchone()[0]
134        self.assertEqual(res, 42)
135
136    def test_callproc_two_params(self):
137        con = self._connect()
138        cur = con.cursor()
139        params = (9, 4)
140        res = cur.callproc("div", params)
141        self.assertIs(res, params)
142        res = cur.fetchone()[0]
143        self.assertEqual(res, 2)
144
145    def test_cursor_type(self):
146
147        class TestCursor(pgdb.Cursor):
148            pass
149
150        con = self._connect()
151        self.assertIs(con.cursor_type, pgdb.Cursor)
152        cur = con.cursor()
153        self.assertIsInstance(cur, pgdb.Cursor)
154        self.assertNotIsInstance(cur, TestCursor)
155        con.cursor_type = TestCursor
156        cur = con.cursor()
157        self.assertIsInstance(cur, TestCursor)
158        cur = con.cursor()
159        self.assertIsInstance(cur, TestCursor)
160        con = self._connect()
161        self.assertIs(con.cursor_type, pgdb.Cursor)
162        cur = con.cursor()
163        self.assertIsInstance(cur, pgdb.Cursor)
164        self.assertNotIsInstance(cur, TestCursor)
165
166    def test_row_factory(self):
167
168        class TestCursor(pgdb.Cursor):
169
170            def row_factory(self, row):
171                return dict(('column %s' % desc[0], value)
172                    for desc, value in zip(self.description, row))
173
174        con = self._connect()
175        con.cursor_type = TestCursor
176        cur = con.cursor()
177        self.assertIsInstance(cur, TestCursor)
178        res = cur.execute("select 1 as a, 2 as b")
179        self.assertIs(res, cur, 'execute() should return cursor')
180        res = cur.fetchone()
181        self.assertIsInstance(res, dict)
182        self.assertEqual(res, {'column a': 1, 'column b': 2})
183        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
184        res = cur.fetchall()
185        self.assertIsInstance(res, list)
186        self.assertEqual(len(res), 2)
187        self.assertIsInstance(res[0], dict)
188        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
189        self.assertIsInstance(res[1], dict)
190        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
191
192    def test_build_row_factory(self):
193
194        class TestCursor(pgdb.Cursor):
195
196            def build_row_factory(self):
197                keys = [desc[0] for desc in self.description]
198                return lambda row: dict((key, value)
199                    for key, value in zip(keys, row))
200
201        con = self._connect()
202        con.cursor_type = TestCursor
203        cur = con.cursor()
204        self.assertIsInstance(cur, TestCursor)
205        cur.execute("select 1 as a, 2 as b")
206        res = cur.fetchone()
207        self.assertIsInstance(res, dict)
208        self.assertEqual(res, {'a': 1, 'b': 2})
209        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
210        res = cur.fetchall()
211        self.assertIsInstance(res, list)
212        self.assertEqual(len(res), 2)
213        self.assertIsInstance(res[0], dict)
214        self.assertEqual(res[0], {'a': 1, 'b': 2})
215        self.assertIsInstance(res[1], dict)
216        self.assertEqual(res[1], {'a': 3, 'b': 4})
217
218    def test_cursor_with_named_columns(self):
219        con = self._connect()
220        cur = con.cursor()
221        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
222        self.assertIs(res, cur, 'execute() should return cursor')
223        res = cur.fetchone()
224        self.assertIsInstance(res, tuple)
225        self.assertEqual(res, (1, 2, 3))
226        self.assertEqual(res._fields, ('abc', 'de', 'f'))
227        self.assertEqual(res.abc, 1)
228        self.assertEqual(res.de, 2)
229        self.assertEqual(res.f, 3)
230        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
231        res = cur.fetchall()
232        self.assertIsInstance(res, list)
233        self.assertEqual(len(res), 2)
234        self.assertIsInstance(res[0], tuple)
235        self.assertEqual(res[0], (1, 2))
236        self.assertEqual(res[0]._fields, ('one', 'two'))
237        self.assertIsInstance(res[1], tuple)
238        self.assertEqual(res[1], (3, 4))
239        self.assertEqual(res[1]._fields, ('one', 'two'))
240
241    def test_cursor_with_unnamed_columns(self):
242        con = self._connect()
243        cur = con.cursor()
244        cur.execute("select 1, 2, 3")
245        res = cur.fetchone()
246        self.assertIsInstance(res, tuple)
247        self.assertEqual(res, (1, 2, 3))
248        old_py = OrderedDict is None  # Python 2.6 or 3.0
249        # old Python versions cannot rename tuple fields with underscore
250        if old_py:
251            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
252        else:
253            self.assertEqual(res._fields, ('_0', '_1', '_2'))
254        cur.execute("select 1 as one, 2, 3 as three")
255        res = cur.fetchone()
256        self.assertIsInstance(res, tuple)
257        self.assertEqual(res, (1, 2, 3))
258        if old_py:  # cannot auto rename with underscore
259            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
260        else:
261            self.assertEqual(res._fields, ('one', '_1', 'three'))
262
263    def test_cursor_with_badly_named_columns(self):
264        con = self._connect()
265        cur = con.cursor()
266        cur.execute("select 1 as abc, 2 as def")
267        res = cur.fetchone()
268        self.assertIsInstance(res, tuple)
269        self.assertEqual(res, (1, 2))
270        old_py = OrderedDict is None  # Python 2.6 or 3.0
271        if old_py:
272            self.assertEqual(res._fields, ('abc', 'column_1'))
273        else:
274            self.assertEqual(res._fields, ('abc', '_1'))
275        cur.execute('select 1 as snake_case, 2 as "CamelCase",'
276            ' 3 as "kebap-case", 4 as "_bad", 5 as "0bad", 6 as "bad$"')
277        res = cur.fetchone()
278        self.assertIsInstance(res, tuple)
279        self.assertEqual(res, (1, 2, 3, 4, 5, 6))
280        # old Python versions cannot rename tuple fields with underscore
281        self.assertEqual(res._fields[:2], ('snake_case', 'CamelCase'))
282        fields = ('_2', '_3', '_4', '_5')
283        if old_py:
284            fields = tuple('column' + field for field in fields)
285        self.assertEqual(res._fields[2:], fields)
286
287    def test_colnames(self):
288        con = self._connect()
289        cur = con.cursor()
290        cur.execute("select 1, 2, 3")
291        names = cur.colnames
292        self.assertIsInstance(names, list)
293        self.assertEqual(names, ['?column?', '?column?', '?column?'])
294        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
295        names = cur.colnames
296        self.assertIsInstance(names, list)
297        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
298
299    def test_coltypes(self):
300        con = self._connect()
301        cur = con.cursor()
302        cur.execute("select 1::int2, 2::int4, 3::int8")
303        types = cur.coltypes
304        self.assertIsInstance(types, list)
305        self.assertEqual(types, ['int2', 'int4', 'int8'])
306
307    def test_description_fields(self):
308        con = self._connect()
309        cur = con.cursor()
310        cur.execute("select 123456789::int8 col0,"
311            " 123456.789::numeric(41, 13) as col1,"
312            " 'foobar'::char(39) as col2")
313        desc = cur.description
314        self.assertIsInstance(desc, list)
315        self.assertEqual(len(desc), 3)
316        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
317        for i in range(3):
318            c, d = cols[i], desc[i]
319            self.assertIsInstance(d, tuple)
320            self.assertEqual(len(d), 7)
321            self.assertIsInstance(d.name, str)
322            self.assertEqual(d.name, 'col%d' % i)
323            self.assertIsInstance(d.type_code, str)
324            self.assertEqual(d.type_code, c[0])
325            self.assertIsNone(d.display_size)
326            self.assertIsInstance(d.internal_size, int)
327            self.assertEqual(d.internal_size, c[1])
328            if c[2] is not None:
329                self.assertIsInstance(d.precision, int)
330                self.assertEqual(d.precision, c[1])
331                self.assertIsInstance(d.scale, int)
332                self.assertEqual(d.scale, c[2])
333            else:
334                self.assertIsNone(d.precision)
335                self.assertIsNone(d.scale)
336            self.assertIsNone(d.null_ok)
337
338    def test_type_cache_info(self):
339        con = self._connect()
340        try:
341            cur = con.cursor()
342            type_cache = con.type_cache
343            self.assertNotIn('numeric', type_cache)
344            type_info = type_cache['numeric']
345            self.assertIn('numeric', type_cache)
346            self.assertEqual(type_info, 'numeric')
347            self.assertEqual(type_info.oid, 1700)
348            self.assertEqual(type_info.len, -1)
349            self.assertEqual(type_info.type, 'b')  # base
350            self.assertEqual(type_info.category, 'N')  # numeric
351            self.assertEqual(type_info.delim, ',')
352            self.assertEqual(type_info.relid, 0)
353            self.assertIs(con.type_cache[1700], type_info)
354            self.assertNotIn('pg_type', type_cache)
355            type_info = type_cache['pg_type']
356            self.assertIn('pg_type', type_cache)
357            self.assertEqual(type_info.type, 'c')  # composite
358            self.assertEqual(type_info.category, 'C')  # composite
359            cols = type_cache.get_fields('pg_type')
360            self.assertEqual(cols[0].name, 'typname')
361            typname = type_cache[cols[0].type]
362            self.assertEqual(typname, 'name')
363            self.assertEqual(typname.type, 'b')  # base
364            self.assertEqual(typname.category, 'S')  # string
365            self.assertEqual(cols[3].name, 'typlen')
366            typlen = type_cache[cols[3].type]
367            self.assertEqual(typlen, 'int2')
368            self.assertEqual(typlen.type, 'b')  # base
369            self.assertEqual(typlen.category, 'N')  # numeric
370            cur.close()
371            cur = con.cursor()
372            type_cache = con.type_cache
373            self.assertIn('numeric', type_cache)
374            cur.close()
375        finally:
376            con.close()
377        con = self._connect()
378        try:
379            cur = con.cursor()
380            type_cache = con.type_cache
381            self.assertNotIn('pg_type', type_cache)
382            self.assertEqual(type_cache.get('pg_type'), type_info)
383            self.assertIn('pg_type', type_cache)
384            self.assertIsNone(type_cache.get(
385                self.table_prefix + '_surely_does_not_exist'))
386            cur.close()
387        finally:
388            con.close()
389
390    def test_type_cache_typecast(self):
391        con = self._connect()
392        try:
393            cur = con.cursor()
394            type_cache = con.type_cache
395            self.assertIs(type_cache.get_typecast('int4'), int)
396            cast_int = lambda v: 'int(%s)' % v
397            type_cache.set_typecast('int4', cast_int)
398            query = 'select 2::int2, 4::int4, 8::int8'
399            cur.execute(query)
400            i2, i4, i8 = cur.fetchone()
401            self.assertEqual(i2, 2)
402            self.assertEqual(i4, 'int(4)')
403            self.assertEqual(i8, 8)
404            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
405            type_cache.set_typecast(['int2', 'int8'], cast_int)
406            cur.execute(query)
407            i2, i4, i8 = cur.fetchone()
408            self.assertEqual(i2, 'int(2)')
409            self.assertEqual(i4, 'int(4)')
410            self.assertEqual(i8, 'int(8)')
411            type_cache.reset_typecast('int4')
412            cur.execute(query)
413            i2, i4, i8 = cur.fetchone()
414            self.assertEqual(i2, 'int(2)')
415            self.assertEqual(i4, 4)
416            self.assertEqual(i8, 'int(8)')
417            type_cache.reset_typecast(['int2', 'int8'])
418            cur.execute(query)
419            i2, i4, i8 = cur.fetchone()
420            self.assertEqual(i2, 2)
421            self.assertEqual(i4, 4)
422            self.assertEqual(i8, 8)
423            type_cache.set_typecast(['int2', 'int8'], cast_int)
424            cur.execute(query)
425            i2, i4, i8 = cur.fetchone()
426            self.assertEqual(i2, 'int(2)')
427            self.assertEqual(i4, 4)
428            self.assertEqual(i8, 'int(8)')
429            type_cache.reset_typecast()
430            cur.execute(query)
431            i2, i4, i8 = cur.fetchone()
432            self.assertEqual(i2, 2)
433            self.assertEqual(i4, 4)
434            self.assertEqual(i8, 8)
435            cur.close()
436        finally:
437            con.close()
438
439    def test_cursor_iteration(self):
440        con = self._connect()
441        cur = con.cursor()
442        cur.execute("select 1 union select 2 union select 3 order by 1")
443        self.assertEqual([r[0] for r in cur], [1, 2, 3])
444
445    def test_cursor_invalidation(self):
446        con = self._connect()
447        cur = con.cursor()
448        cur.execute("select 1 union select 2")
449        self.assertEqual(cur.fetchone(), (1,))
450        self.assertFalse(con.closed)
451        con.close()
452        self.assertTrue(con.closed)
453        self.assertRaises(pgdb.OperationalError, cur.fetchone)
454
455    def test_fetch_2_rows(self):
456        Decimal = pgdb.decimal_type()
457        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
458            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
459            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
460            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
461            pgdb.Interval(15, 31, 5), 7897234)
462        table = self.table_prefix + 'booze'
463        con = self._connect()
464        try:
465            cur = con.cursor()
466            cur.execute("set datestyle to iso")
467            cur.execute("create table %s ("
468                "stringtest varchar,"
469                "binarytest bytea,"
470                "booltest bool,"
471                "integertest int4,"
472                "longtest int8,"
473                "floattest float8,"
474                "numerictest numeric,"
475                "moneytest money,"
476                "datetest date,"
477                "timetest time,"
478                "datetimetest timestamp,"
479                "intervaltest interval,"
480                "rowidtest oid)" % table)
481            cur.execute("set standard_conforming_strings to on")
482            for s in ('numeric', 'monetary', 'time'):
483                cur.execute("set lc_%s to 'C'" % s)
484            for _i in range(2):
485                cur.execute("insert into %s values ("
486                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
487                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
488            cur.execute("select * from %s" % table)
489            rows = cur.fetchall()
490            self.assertEqual(len(rows), 2)
491            row0 = rows[0]
492            self.assertEqual(row0, values)
493            self.assertEqual(row0, rows[1])
494            self.assertIsInstance(row0[0], str)
495            self.assertIsInstance(row0[1], bytes)
496            self.assertIsInstance(row0[2], bool)
497            self.assertIsInstance(row0[3], int)
498            self.assertIsInstance(row0[4], long)
499            self.assertIsInstance(row0[5], float)
500            self.assertIsInstance(row0[6], Decimal)
501            self.assertIsInstance(row0[7], Decimal)
502            self.assertIsInstance(row0[8], date)
503            self.assertIsInstance(row0[9], time)
504            self.assertIsInstance(row0[10], datetime)
505            self.assertIsInstance(row0[11], timedelta)
506        finally:
507            con.close()
508
509    def test_integrity_error(self):
510        table = self.table_prefix + 'booze'
511        con = self._connect()
512        try:
513            cur = con.cursor()
514            cur.execute("set client_min_messages = warning")
515            cur.execute("create table %s (i int primary key)" % table)
516            cur.execute("insert into %s values (1)" % table)
517            cur.execute("insert into %s values (2)" % table)
518            self.assertRaises(pgdb.IntegrityError, cur.execute,
519                "insert into %s values (1)" % table)
520        finally:
521            con.close()
522
523    def test_update_rowcount(self):
524        table = self.table_prefix + 'booze'
525        con = self._connect()
526        try:
527            cur = con.cursor()
528            cur.execute("create table %s (i int)" % table)
529            cur.execute("insert into %s values (1)" % table)
530            cur.execute("update %s set i=2 where i=2 returning i" % table)
531            self.assertEqual(cur.rowcount, 0)
532            cur.execute("update %s set i=2 where i=1 returning i" % table)
533            self.assertEqual(cur.rowcount, 1)
534            cur.close()
535            # keep rowcount even if cursor is closed (needed by SQLAlchemy)
536            self.assertEqual(cur.rowcount, 1)
537        finally:
538            con.close()
539
540    def test_sqlstate(self):
541        con = self._connect()
542        cur = con.cursor()
543        try:
544            cur.execute("select 1/0")
545        except pgdb.DatabaseError as error:
546            self.assertTrue(isinstance(error, pgdb.DataError))
547            # the SQLSTATE error code for division by zero is 22012
548            self.assertEqual(error.sqlstate, '22012')
549
550    def test_float(self):
551        nan, inf = float('nan'), float('inf')
552        from math import isnan, isinf
553        self.assertTrue(isnan(nan) and not isinf(nan))
554        self.assertTrue(isinf(inf) and not isnan(inf))
555        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
556            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
557        table = self.table_prefix + 'booze'
558        con = self._connect()
559        try:
560            cur = con.cursor()
561            cur.execute(
562                "create table %s (n smallint, floattest float)" % table)
563            params = enumerate(values)
564            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
565            cur.execute("select floattest from %s order by n" % table)
566            rows = cur.fetchall()
567            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
568            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
569            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
570        finally:
571            con.close()
572        self.assertEqual(len(rows), len(values))
573        rows = [row[0] for row in rows]
574        for inval, outval in zip(values, rows):
575            if inval in ('inf', 'Infinity'):
576                inval = inf
577            elif inval in ('-inf', '-Infinity'):
578                inval = -inf
579            elif inval in ('nan', 'NaN'):
580                inval = nan
581            if isinf(inval):
582                self.assertTrue(isinf(outval))
583                if inval < 0:
584                    self.assertTrue(outval < 0)
585                else:
586                    self.assertTrue(outval > 0)
587            elif isnan(inval):
588                self.assertTrue(isnan(outval))
589            else:
590                self.assertEqual(inval, outval)
591
592    def test_datetime(self):
593        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
594        table = self.table_prefix + 'booze'
595        con = self._connect()
596        try:
597            cur = con.cursor()
598            cur.execute("set timezone = UTC")
599            cur.execute("create table %s ("
600                "d date, t time,  ts timestamp,"
601                "tz timetz, tsz timestamptz)" % table)
602            for n in range(3):
603                values = [dt.date(), dt.time(), dt,
604                    dt.time(), dt]
605                values[3] = values[3].replace(tzinfo=pgdb.timezone.utc)
606                values[4] = values[4].replace(tzinfo=pgdb.timezone.utc)
607                if n == 0:  # input as objects
608                    params = values
609                if n == 1:  # input as text
610                    params = [v.isoformat() for v in values]  # as text
611                elif n == 2:  # input using type helpers
612                    d = (dt.year, dt.month, dt.day)
613                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
614                    z = (pgdb.timezone.utc,)
615                    params = [pgdb.Date(*d), pgdb.Time(*t),
616                            pgdb.Timestamp(*(d + t)), pgdb.Time(*(t + z)),
617                            pgdb.Timestamp(*(d + t + z))]
618                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
619                        'sql, mdy', 'sql, dmy', 'german'):
620                    cur.execute("set datestyle to %s" % datestyle)
621                    if n != 1:
622                        cur.execute("select %s,%s,%s,%s,%s", params)
623                        row = cur.fetchone()
624                        self.assertEqual(row, tuple(values))
625                    cur.execute("insert into %s"
626                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
627                    cur.execute("select * from %s" % table)
628                    d = cur.description
629                    for i in range(5):
630                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
631                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
632                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
633                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
634                    self.assertEqual(d[0].type_code, pgdb.DATE)
635                    self.assertEqual(d[1].type_code, pgdb.TIME)
636                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
637                    self.assertEqual(d[3].type_code, pgdb.TIME)
638                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
639                    row = cur.fetchone()
640                    self.assertEqual(row, tuple(values))
641                    cur.execute("delete from %s" % table)
642        finally:
643            con.close()
644
645    def test_interval(self):
646        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
647        table = self.table_prefix + 'booze'
648        con = self._connect()
649        try:
650            cur = con.cursor()
651            cur.execute("create table %s (i interval)" % table)
652            for n in range(3):
653                if n == 0:  # input as objects
654                    param = td
655                if n == 1:  # input as text
656                    param = '%d days %d seconds %d microseconds ' % (
657                        td.days, td.seconds, td.microseconds)
658                elif n == 2:  # input using type helpers
659                    param = pgdb.Interval(
660                        td.days, 0, 0, td.seconds, td.microseconds)
661                for intervalstyle in ('sql_standard ', 'postgres',
662                        'postgres_verbose', 'iso_8601'):
663                    cur.execute("set intervalstyle to %s" % intervalstyle)
664                    cur.execute("insert into %s"
665                        " values (%%s)" % table, [param])
666                    cur.execute("select * from %s" % table)
667                    tc = cur.description[0].type_code
668                    self.assertEqual(tc, pgdb.DATETIME)
669                    self.assertNotEqual(tc, pgdb.STRING)
670                    self.assertNotEqual(tc, pgdb.ARRAY)
671                    self.assertNotEqual(tc, pgdb.RECORD)
672                    self.assertEqual(tc, pgdb.INTERVAL)
673                    row = cur.fetchone()
674                    self.assertEqual(row, (td,))
675                    cur.execute("delete from %s" % table)
676        finally:
677            con.close()
678
679    def test_hstore(self):
680        con = self._connect()
681        try:
682            cur = con.cursor()
683            cur.execute("select 'k=>v'::hstore")
684        except pgdb.DatabaseError:
685            try:
686                cur.execute("create extension hstore")
687            except pgdb.DatabaseError:
688                self.skipTest("hstore extension not enabled")
689        finally:
690            con.close()
691        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever', 'back\\': '\\slash',
692            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
693            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
694            'None': None, 'NULL': 'NULL', 'empty': ''}
695        con = self._connect()
696        try:
697            cur = con.cursor()
698            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
699            result = cur.fetchone()[0]
700        finally:
701            con.close()
702        self.assertIsInstance(result, dict)
703        self.assertEqual(result, d)
704
705    def test_uuid(self):
706        self.assertIs(Uuid, pgdb.Uuid)
707        d = Uuid('{12345678-1234-5678-1234-567812345678}')
708        con = self._connect()
709        try:
710            cur = con.cursor()
711            cur.execute("select %s::uuid", (d,))
712            result = cur.fetchone()[0]
713        finally:
714            con.close()
715        self.assertIsInstance(result, Uuid)
716        self.assertEqual(result, d)
717
718    def test_insert_array(self):
719        values = [(None, None), ([], []), ([None], [[None], ['null']]),
720            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
721            ([20000, 25000, 25000, 30000],
722            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
723            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
724        table = self.table_prefix + 'booze'
725        con = self._connect()
726        try:
727            cur = con.cursor()
728            cur.execute("create table %s"
729                " (n smallint, i int[], t text[][])" % table)
730            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
731            # Note that we must explicit casts because we are inserting
732            # empty arrays.  Otherwise this is not necessary.
733            cur.executemany("insert into %s values"
734                " (%%d,%%s::int[],%%s::text[][])" % table, params)
735            cur.execute("select i, t from %s order by n" % table)
736            d = cur.description
737            self.assertEqual(d[0].type_code, pgdb.ARRAY)
738            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
739            self.assertEqual(d[0].type_code, pgdb.NUMBER)
740            self.assertEqual(d[0].type_code, pgdb.INTEGER)
741            self.assertEqual(d[1].type_code, pgdb.ARRAY)
742            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
743            self.assertEqual(d[1].type_code, pgdb.STRING)
744            rows = cur.fetchall()
745        finally:
746            con.close()
747        self.assertEqual(rows, values)
748
749    def test_select_array(self):
750        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
751        con = self._connect()
752        try:
753            cur = con.cursor()
754            cur.execute("select %s::int[], %s::text[]", values)
755            row = cur.fetchone()
756        finally:
757            con.close()
758        self.assertEqual(row, values)
759
760    def test_unicode_list_and_tuple(self):
761        value = (u'KÀse', u'WÃŒrstchen')
762        con = self._connect()
763        try:
764            cur = con.cursor()
765            try:
766                cur.execute("select %s, %s", value)
767            except pgdb.DatabaseError:
768                self.skipTest('database does not support latin-1')
769            row = cur.fetchone()
770            cur.execute("select %s, %s", (list(value), tuple(value)))
771            as_list, as_tuple = cur.fetchone()
772        finally:
773            con.close()
774        self.assertEqual(as_list, list(row))
775        self.assertEqual(as_tuple, tuple(row))
776
777    def test_insert_record(self):
778        values = [('John', 61), ('Jane', 63),
779                  ('Fred', None), ('Wilma', None),
780                  (None, 42), (None, None)]
781        table = self.table_prefix + 'booze'
782        record = self.table_prefix + 'munch'
783        con = self._connect()
784        try:
785            cur = con.cursor()
786            cur.execute("create type %s as (name varchar, age int)" % record)
787            cur.execute("create table %s (n smallint, r %s)" % (table, record))
788            params = enumerate(values)
789            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
790            cur.execute("select r from %s order by n" % table)
791            type_code = cur.description[0].type_code
792            self.assertEqual(type_code, record)
793            self.assertEqual(type_code, pgdb.RECORD)
794            self.assertNotEqual(type_code, pgdb.ARRAY)
795            columns = con.type_cache.get_fields(type_code)
796            self.assertEqual(columns[0].name, 'name')
797            self.assertEqual(columns[1].name, 'age')
798            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
799            self.assertEqual(con.type_cache[columns[1].type], 'int4')
800            rows = cur.fetchall()
801        finally:
802            cur.execute('drop table %s' % table)
803            cur.execute('drop type %s' % record)
804            con.close()
805        self.assertEqual(len(rows), len(values))
806        rows = [row[0] for row in rows]
807        self.assertEqual(rows, values)
808        self.assertEqual(rows[0].name, 'John')
809        self.assertEqual(rows[0].age, 61)
810
811    def test_select_record(self):
812        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
813            '(test)', '(x,y)', ' x y ', 'null', None)
814        con = self._connect()
815        try:
816            cur = con.cursor()
817            cur.execute("select %s as test_record", [value])
818            self.assertEqual(cur.description[0].name, 'test_record')
819            self.assertEqual(cur.description[0].type_code, 'record')
820            row = cur.fetchone()[0]
821        finally:
822            con.close()
823        # Note that the element types get lost since we created an
824        # untyped record (an anonymous composite type). For the same
825        # reason this is also a normal tuple, not a named tuple.
826        text_row = tuple(None if v is None else str(v) for v in value)
827        self.assertEqual(row, text_row)
828
829    def test_custom_type(self):
830        values = [3, 5, 65]
831        values = list(map(PgBitString, values))
832        table = self.table_prefix + 'booze'
833        con = self._connect()
834        try:
835            cur = con.cursor()
836            params = enumerate(values)  # params have __pg_repr__ method
837            cur.execute(
838                'create table "%s" (n smallint, b bit varying(7))' % table)
839            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
840            cur.execute("select * from %s" % table)
841            rows = cur.fetchall()
842        finally:
843            con.close()
844        self.assertEqual(len(rows), len(values))
845        con = self._connect()
846        try:
847            cur = con.cursor()
848            params = (1, object())  # an object that cannot be handled
849            self.assertRaises(pgdb.InterfaceError, cur.execute,
850                "insert into %s values (%%s,%%s)" % table, params)
851        finally:
852            con.close()
853
854    def test_set_decimal_type(self):
855        decimal_type = pgdb.decimal_type()
856        self.assertTrue(decimal_type is not None and callable(decimal_type))
857        con = self._connect()
858        try:
859            cur = con.cursor()
860            # change decimal type globally to int
861            int_type = lambda v: int(float(v))
862            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
863            cur.execute('select 4.25')
864            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
865            value = cur.fetchone()[0]
866            self.assertTrue(isinstance(value, int))
867            self.assertEqual(value, 4)
868            # change decimal type again to float
869            self.assertTrue(pgdb.decimal_type(float) is float)
870            cur.execute('select 4.25')
871            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
872            value = cur.fetchone()[0]
873            # the connection still uses the old setting
874            self.assertTrue(isinstance(value, int))
875            # bust the cache for type functions for the connection
876            con.type_cache.reset_typecast()
877            cur.execute('select 4.25')
878            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
879            value = cur.fetchone()[0]
880            # now the connection uses the new setting
881            self.assertTrue(isinstance(value, float))
882            self.assertEqual(value, 4.25)
883        finally:
884            con.close()
885            pgdb.decimal_type(decimal_type)
886        self.assertTrue(pgdb.decimal_type() is decimal_type)
887
888    def test_global_typecast(self):
889        try:
890            query = 'select 2::int2, 4::int4, 8::int8'
891            self.assertIs(pgdb.get_typecast('int4'), int)
892            cast_int = lambda v: 'int(%s)' % v
893            pgdb.set_typecast('int4', cast_int)
894            con = self._connect()
895            try:
896                i2, i4, i8 = con.cursor().execute(query).fetchone()
897            finally:
898                con.close()
899            self.assertEqual(i2, 2)
900            self.assertEqual(i4, 'int(4)')
901            self.assertEqual(i8, 8)
902            pgdb.set_typecast(['int2', 'int8'], cast_int)
903            con = self._connect()
904            try:
905                i2, i4, i8 = con.cursor().execute(query).fetchone()
906            finally:
907                con.close()
908            self.assertEqual(i2, 'int(2)')
909            self.assertEqual(i4, 'int(4)')
910            self.assertEqual(i8, 'int(8)')
911            pgdb.reset_typecast('int4')
912            con = self._connect()
913            try:
914                i2, i4, i8 = con.cursor().execute(query).fetchone()
915            finally:
916                con.close()
917            self.assertEqual(i2, 'int(2)')
918            self.assertEqual(i4, 4)
919            self.assertEqual(i8, 'int(8)')
920            pgdb.reset_typecast(['int2', 'int8'])
921            con = self._connect()
922            try:
923                i2, i4, i8 = con.cursor().execute(query).fetchone()
924            finally:
925                con.close()
926            self.assertEqual(i2, 2)
927            self.assertEqual(i4, 4)
928            self.assertEqual(i8, 8)
929            pgdb.set_typecast(['int2', 'int8'], cast_int)
930            con = self._connect()
931            try:
932                i2, i4, i8 = con.cursor().execute(query).fetchone()
933            finally:
934                con.close()
935            self.assertEqual(i2, 'int(2)')
936            self.assertEqual(i4, 4)
937            self.assertEqual(i8, 'int(8)')
938        finally:
939            pgdb.reset_typecast()
940        con = self._connect()
941        try:
942            i2, i4, i8 = con.cursor().execute(query).fetchone()
943        finally:
944            con.close()
945        self.assertEqual(i2, 2)
946        self.assertEqual(i4, 4)
947        self.assertEqual(i8, 8)
948
949    def test_set_typecast_for_arrays(self):
950        query = 'select ARRAY[1,2,3]'
951        try:
952            con = self._connect()
953            try:
954                r = con.cursor().execute(query).fetchone()[0]
955            finally:
956                con.close()
957            self.assertIsInstance(r, list)
958            self.assertEqual(r, [1, 2, 3])
959            pgdb.set_typecast('anyarray', lambda v, basecast: v)
960            con = self._connect()
961            try:
962                r = con.cursor().execute(query).fetchone()[0]
963            finally:
964                con.close()
965            self.assertIsInstance(r, str)
966            self.assertEqual(r, '{1,2,3}')
967        finally:
968            pgdb.reset_typecast()
969        con = self._connect()
970        try:
971            r = con.cursor().execute(query).fetchone()[0]
972        finally:
973            con.close()
974        self.assertIsInstance(r, list)
975        self.assertEqual(r, [1, 2, 3])
976
977    def test_unicode_with_utf8(self):
978        table = self.table_prefix + 'booze'
979        input = u"He wes Leovenaðes sone — liðe him be Drihten"
980        con = self._connect()
981        try:
982            cur = con.cursor()
983            cur.execute("create table %s (t text)" % table)
984            try:
985                cur.execute("set client_encoding=utf8")
986                cur.execute(u"select '%s'" % input)
987            except Exception:
988                self.skipTest("database does not support utf8")
989            output1 = cur.fetchone()[0]
990            cur.execute("insert into %s values (%%s)" % table, (input,))
991            cur.execute("select * from %s" % table)
992            output2 = cur.fetchone()[0]
993            cur.execute("select t = '%s' from %s" % (input, table))
994            output3 = cur.fetchone()[0]
995            cur.execute("select t = %%s from %s" % table, (input,))
996            output4 = cur.fetchone()[0]
997        finally:
998            con.close()
999        if str is bytes:  # Python < 3.0
1000            input = input.encode('utf8')
1001        self.assertIsInstance(output1, str)
1002        self.assertEqual(output1, input)
1003        self.assertIsInstance(output2, str)
1004        self.assertEqual(output2, input)
1005        self.assertIsInstance(output3, bool)
1006        self.assertTrue(output3)
1007        self.assertIsInstance(output4, bool)
1008        self.assertTrue(output4)
1009
1010    def test_unicode_with_latin1(self):
1011        table = self.table_prefix + 'booze'
1012        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
1013        con = self._connect()
1014        try:
1015            cur = con.cursor()
1016            cur.execute("create table %s (t text)" % table)
1017            try:
1018                cur.execute("set client_encoding=latin1")
1019                cur.execute(u"select '%s'" % input)
1020            except Exception:
1021                self.skipTest("database does not support latin1")
1022            output1 = cur.fetchone()[0]
1023            cur.execute("insert into %s values (%%s)" % table, (input,))
1024            cur.execute("select * from %s" % table)
1025            output2 = cur.fetchone()[0]
1026            cur.execute("select t = '%s' from %s" % (input, table))
1027            output3 = cur.fetchone()[0]
1028            cur.execute("select t = %%s from %s" % table, (input,))
1029            output4 = cur.fetchone()[0]
1030        finally:
1031            con.close()
1032        if str is bytes:  # Python < 3.0
1033            input = input.encode('latin1')
1034        self.assertIsInstance(output1, str)
1035        self.assertEqual(output1, input)
1036        self.assertIsInstance(output2, str)
1037        self.assertEqual(output2, input)
1038        self.assertIsInstance(output3, bool)
1039        self.assertTrue(output3)
1040        self.assertIsInstance(output4, bool)
1041        self.assertTrue(output4)
1042
1043    def test_bool(self):
1044        values = [False, True, None, 't', 'f', 'true', 'false']
1045        table = self.table_prefix + 'booze'
1046        con = self._connect()
1047        try:
1048            cur = con.cursor()
1049            cur.execute(
1050                "create table %s (n smallint, booltest bool)" % table)
1051            params = enumerate(values)
1052            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
1053            cur.execute("select booltest from %s order by n" % table)
1054            rows = cur.fetchall()
1055            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
1056        finally:
1057            con.close()
1058        rows = [row[0] for row in rows]
1059        values[3] = values[5] = True
1060        values[4] = values[6] = False
1061        self.assertEqual(rows, values)
1062
1063    def test_literal(self):
1064        con = self._connect()
1065        try:
1066            cur = con.cursor()
1067            value = "lower('Hello')"
1068            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
1069            row = cur.fetchone()
1070        finally:
1071            con.close()
1072        self.assertEqual(row, (value, 'hello'))
1073
1074    def test_json(self):
1075        inval = {"employees":
1076            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1077        table = self.table_prefix + 'booze'
1078        con = self._connect()
1079        try:
1080            cur = con.cursor()
1081            try:
1082                cur.execute("create table %s (jsontest json)" % table)
1083            except pgdb.ProgrammingError:
1084                self.skipTest('database does not support json')
1085            params = (pgdb.Json(inval),)
1086            cur.execute("insert into %s values (%%s)" % table, params)
1087            cur.execute("select jsontest from %s" % table)
1088            outval = cur.fetchone()[0]
1089            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1090        finally:
1091            con.close()
1092        self.assertEqual(inval, outval)
1093
1094    def test_jsonb(self):
1095        inval = {"employees":
1096            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1097        table = self.table_prefix + 'booze'
1098        con = self._connect()
1099        try:
1100            cur = con.cursor()
1101            try:
1102                cur.execute("create table %s (jsonbtest jsonb)" % table)
1103            except pgdb.ProgrammingError:
1104                self.skipTest('database does not support jsonb')
1105            params = (pgdb.Json(inval),)
1106            cur.execute("insert into %s values (%%s)" % table, params)
1107            cur.execute("select jsonbtest from %s" % table)
1108            outval = cur.fetchone()[0]
1109            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1110        finally:
1111            con.close()
1112        self.assertEqual(inval, outval)
1113
1114    def test_execute_edge_cases(self):
1115        con = self._connect()
1116        try:
1117            cur = con.cursor()
1118            sql = 'invalid'  # should be ignored with empty parameter list
1119            cur.executemany(sql, [])
1120            sql = 'select %d + 1'
1121            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
1122            self.assertEqual(cur.fetchone()[0], 3)
1123            sql = 'select 1/0'  # cannot be executed
1124            self.assertRaises(pgdb.DataError, cur.execute, sql)
1125            cur.close()
1126            con.rollback()
1127            if pgdb.shortcutmethods:
1128                res = con.execute('select %d', (1,)).fetchone()
1129                self.assertEqual(res, (1,))
1130                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1131                self.assertEqual(res, (2,))
1132        finally:
1133            con.close()
1134        sql = 'select 1'  # cannot be executed after connection is closed
1135        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1136
1137    def test_fetchmany_with_keep(self):
1138        con = self._connect()
1139        try:
1140            cur = con.cursor()
1141            self.assertEqual(cur.arraysize, 1)
1142            cur.execute('select * from generate_series(1, 25)')
1143            self.assertEqual(len(cur.fetchmany()), 1)
1144            self.assertEqual(len(cur.fetchmany()), 1)
1145            self.assertEqual(cur.arraysize, 1)
1146            cur.arraysize = 3
1147            self.assertEqual(len(cur.fetchmany()), 3)
1148            self.assertEqual(len(cur.fetchmany()), 3)
1149            self.assertEqual(cur.arraysize, 3)
1150            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1151            self.assertEqual(cur.arraysize, 3)
1152            self.assertEqual(len(cur.fetchmany()), 3)
1153            self.assertEqual(len(cur.fetchmany()), 3)
1154            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1155            self.assertEqual(cur.arraysize, 2)
1156            self.assertEqual(len(cur.fetchmany()), 2)
1157            self.assertEqual(len(cur.fetchmany()), 2)
1158            self.assertEqual(len(cur.fetchmany(25)), 3)
1159        finally:
1160            con.close()
1161
1162    def test_nextset(self):
1163        con = self._connect()
1164        cur = con.cursor()
1165        self.assertRaises(con.NotSupportedError, cur.nextset)
1166
1167    def test_setoutputsize(self):
1168        pass  # not supported
1169
1170    def test_connection_errors(self):
1171        con = self._connect()
1172        self.assertEqual(con.Error, pgdb.Error)
1173        self.assertEqual(con.Warning, pgdb.Warning)
1174        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1175        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1176        self.assertEqual(con.InternalError, pgdb.InternalError)
1177        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1178        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1179        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1180        self.assertEqual(con.DataError, pgdb.DataError)
1181        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1182
1183    def test_connection_as_contextmanager(self):
1184        table = self.table_prefix + 'booze'
1185        con = self._connect()
1186        try:
1187            cur = con.cursor()
1188            cur.execute("create table %s (n smallint check(n!=4))" % table)
1189            with con:
1190                cur.execute("insert into %s values (1)" % table)
1191                cur.execute("insert into %s values (2)" % table)
1192            try:
1193                with con:
1194                    cur.execute("insert into %s values (3)" % table)
1195                    cur.execute("insert into %s values (4)" % table)
1196            except con.IntegrityError as error:
1197                self.assertTrue('check' in str(error).lower())
1198            with con:
1199                cur.execute("insert into %s values (5)" % table)
1200                cur.execute("insert into %s values (6)" % table)
1201            try:
1202                with con:
1203                    cur.execute("insert into %s values (7)" % table)
1204                    cur.execute("insert into %s values (8)" % table)
1205                    raise ValueError('transaction should rollback')
1206            except ValueError as error:
1207                self.assertEqual(str(error), 'transaction should rollback')
1208            with con:
1209                cur.execute("insert into %s values (9)" % table)
1210            cur.execute("select * from %s order by 1" % table)
1211            rows = cur.fetchall()
1212            rows = [row[0] for row in rows]
1213        finally:
1214            con.close()
1215        self.assertEqual(rows, [1, 2, 5, 6, 9])
1216
1217    def test_cursor_connection(self):
1218        con = self._connect()
1219        cur = con.cursor()
1220        self.assertEqual(cur.connection, con)
1221        cur.close()
1222
1223    def test_cursor_as_contextmanager(self):
1224        con = self._connect()
1225        with con.cursor() as cur:
1226            self.assertEqual(cur.connection, con)
1227
1228    def test_pgdb_type(self):
1229        self.assertEqual(pgdb.STRING, pgdb.STRING)
1230        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1231        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1232        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1233        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1234        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1235        self.assertEqual('char', pgdb.STRING)
1236        self.assertEqual('varchar', pgdb.STRING)
1237        self.assertEqual('text', pgdb.STRING)
1238        self.assertNotEqual('numeric', pgdb.STRING)
1239        self.assertEqual('numeric', pgdb.NUMERIC)
1240        self.assertEqual('numeric', pgdb.NUMBER)
1241        self.assertEqual('int4', pgdb.NUMBER)
1242        self.assertNotEqual('int4', pgdb.NUMERIC)
1243        self.assertEqual('int2', pgdb.SMALLINT)
1244        self.assertNotEqual('int4', pgdb.SMALLINT)
1245        self.assertEqual('int2', pgdb.INTEGER)
1246        self.assertEqual('int4', pgdb.INTEGER)
1247        self.assertEqual('int8', pgdb.INTEGER)
1248        self.assertNotEqual('int4', pgdb.LONG)
1249        self.assertEqual('int8', pgdb.LONG)
1250        self.assertTrue('char' in pgdb.STRING)
1251        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1252        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1253        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1254        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1255        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1256        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1257        self.assertEqual('_char', pgdb.ARRAY)
1258        self.assertNotEqual('char', pgdb.ARRAY)
1259        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1260        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1261        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1262        self.assertEqual('record', pgdb.RECORD)
1263        self.assertNotEqual('_record', pgdb.RECORD)
1264
1265    def test_no_close(self):
1266        data = ('hello', 'world')
1267        con = self._connect()
1268        cur = con.cursor()
1269        cur.build_row_factory = lambda: tuple
1270        cur.execute("select %s, %s", data)
1271        row = cur.fetchone()
1272        self.assertEqual(row, data)
1273
1274    def test_set_row_factory_size(self):
1275        try:
1276            from functools import lru_cache
1277        except ImportError:  # Python < 3.2
1278            lru_cache = None
1279        queries = ['select 1 as a, 2 as b, 3 as c', 'select 123 as abc']
1280        con = self._connect()
1281        cur = con.cursor()
1282        for maxsize in (None, 0, 1, 2, 3, 10, 1024):
1283            pgdb.set_row_factory_size(maxsize)
1284            for i in range(3):
1285                for q in queries:
1286                    cur.execute(q)
1287                    r = cur.fetchone()
1288                    if q.endswith('abc'):
1289                        self.assertEqual(r, (123,))
1290                        self.assertEqual(r._fields, ('abc',))
1291                    else:
1292                        self.assertEqual(r, (1, 2, 3))
1293                        self.assertEqual(r._fields, ('a', 'b', 'c'))
1294            if lru_cache:
1295                info = pgdb._row_factory.cache_info()
1296                self.assertEqual(info.maxsize, maxsize)
1297                self.assertEqual(info.hits + info.misses, 6)
1298                self.assertEqual(info.hits,
1299                    0 if maxsize is not None and maxsize < 2 else 4)
1300
1301    def test_memory_leaks(self):
1302        ids = set()
1303        objs = []
1304        add_ids = ids.update
1305        gc.collect()
1306        objs[:] = gc.get_objects()
1307        add_ids(id(obj) for obj in objs)
1308        self.test_no_close()
1309        gc.collect()
1310        objs[:] = gc.get_objects()
1311        objs[:] = [obj for obj in objs if id(obj) not in ids]
1312        if objs and sys.version_info[:3] in ((3, 5, 0), (3, 5, 1)):
1313            # workaround for Python issue 26811
1314            objs[:] = [obj for obj in objs if repr(obj) != '(<NULL>,)']
1315        self.assertEqual(len(objs), 0)
1316
1317
1318if __name__ == '__main__':
1319    unittest.main()
Note: See TracBrowser for help on using the repository browser.