source: trunk/tests/test_dbapi20.py @ 894

Last change on this file since 894 was 894, checked in by cito, 3 years ago

Cache the namedtuple classes used for query result rows

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 50.9 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 894 2016-09-23 14:04:06Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31import gc
32import sys
33
34from datetime import date, time, datetime, timedelta
35from uuid import UUID as Uuid
36
37try:
38    long
39except NameError:  # Python >= 3.0
40    long = int
41
42try:
43    from collections import OrderedDict
44except ImportError:  # Python 2.6 or 3.0
45    OrderedDict = None
46
47
48class PgBitString:
49    """Test object with a PostgreSQL representation as Bit String."""
50
51    def __init__(self, value):
52        self.value = value
53
54    def __pg_repr__(self):
55         return "B'{0:b}'".format(self.value)
56
57
58class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
59
60    driver = pgdb
61    connect_args = ()
62    connect_kw_args = {'database': dbname,
63        'host': '%s:%d' % (dbhost or '', dbport or -1)}
64
65    lower_func = 'lower'  # For stored procedure test
66
67    def setUp(self):
68        # Call superclass setUp in case this does something in the future
69        dbapi20.DatabaseAPI20Test.setUp(self)
70        try:
71            con = self._connect()
72            con.close()
73        except pgdb.Error:  # try to create a missing database
74            import pg
75            try:  # first try to log in as superuser
76                db = pg.DB('postgres', dbhost or None, dbport or -1,
77                    user='postgres')
78            except Exception:  # then try to log in as current user
79                db = pg.DB('postgres', dbhost or None, dbport or -1)
80            db.query('create database ' + dbname)
81
82    def tearDown(self):
83        dbapi20.DatabaseAPI20Test.tearDown(self)
84
85    def test_version(self):
86        v = pgdb.version
87        self.assertIsInstance(v, str)
88        self.assertIn('.', v)
89        self.assertEqual(pgdb.__version__, v)
90
91    def test_connect_kwargs(self):
92        application_name = 'PyGreSQL DB API 2.0 Test'
93        self.connect_kw_args['application_name'] = application_name
94        con = self._connect()
95        cur = con.cursor()
96        cur.execute("select application_name from pg_stat_activity"
97            " where application_name = %s", (application_name,))
98        self.assertEqual(cur.fetchone(), (application_name,))
99
100    def test_percent_sign(self):
101        con = self._connect()
102        cur = con.cursor()
103        cur.execute("select %s, 'a %% sign'", ('a % sign',))
104        self.assertEqual(cur.fetchone(), ('a % sign', 'a % sign'))
105        cur.execute("select 'a % sign'")
106        self.assertEqual(cur.fetchone(), ('a % sign',))
107        cur.execute("select 'a %% sign'")
108        self.assertEqual(cur.fetchone(), ('a % sign',))
109
110    def test_callproc_no_params(self):
111        con = self._connect()
112        cur = con.cursor()
113        # note that now() does not change within a transaction
114        cur.execute('select now()')
115        now = cur.fetchone()[0]
116        res = cur.callproc('now')
117        self.assertIsNone(res)
118        res = cur.fetchone()[0]
119        self.assertEqual(res, now)
120
121    def test_callproc_bad_params(self):
122        con = self._connect()
123        cur = con.cursor()
124        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
125        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
126
127    def test_callproc_one_param(self):
128        con = self._connect()
129        cur = con.cursor()
130        params = (42.4382,)
131        res = cur.callproc("round", params)
132        self.assertIs(res, params)
133        res = cur.fetchone()[0]
134        self.assertEqual(res, 42)
135
136    def test_callproc_two_params(self):
137        con = self._connect()
138        cur = con.cursor()
139        params = (9, 4)
140        res = cur.callproc("div", params)
141        self.assertIs(res, params)
142        res = cur.fetchone()[0]
143        self.assertEqual(res, 2)
144
145    def test_cursor_type(self):
146
147        class TestCursor(pgdb.Cursor):
148            pass
149
150        con = self._connect()
151        self.assertIs(con.cursor_type, pgdb.Cursor)
152        cur = con.cursor()
153        self.assertIsInstance(cur, pgdb.Cursor)
154        self.assertNotIsInstance(cur, TestCursor)
155        con.cursor_type = TestCursor
156        cur = con.cursor()
157        self.assertIsInstance(cur, TestCursor)
158        cur = con.cursor()
159        self.assertIsInstance(cur, TestCursor)
160        con = self._connect()
161        self.assertIs(con.cursor_type, pgdb.Cursor)
162        cur = con.cursor()
163        self.assertIsInstance(cur, pgdb.Cursor)
164        self.assertNotIsInstance(cur, TestCursor)
165
166    def test_row_factory(self):
167
168        class TestCursor(pgdb.Cursor):
169
170            def row_factory(self, row):
171                return dict(('column %s' % desc[0], value)
172                    for desc, value in zip(self.description, row))
173
174        con = self._connect()
175        con.cursor_type = TestCursor
176        cur = con.cursor()
177        self.assertIsInstance(cur, TestCursor)
178        res = cur.execute("select 1 as a, 2 as b")
179        self.assertIs(res, cur, 'execute() should return cursor')
180        res = cur.fetchone()
181        self.assertIsInstance(res, dict)
182        self.assertEqual(res, {'column a': 1, 'column b': 2})
183        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
184        res = cur.fetchall()
185        self.assertIsInstance(res, list)
186        self.assertEqual(len(res), 2)
187        self.assertIsInstance(res[0], dict)
188        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
189        self.assertIsInstance(res[1], dict)
190        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
191
192    def test_build_row_factory(self):
193
194        class TestCursor(pgdb.Cursor):
195
196            def build_row_factory(self):
197                keys = [desc[0] for desc in self.description]
198                return lambda row: dict((key, value)
199                    for key, value in zip(keys, row))
200
201        con = self._connect()
202        con.cursor_type = TestCursor
203        cur = con.cursor()
204        self.assertIsInstance(cur, TestCursor)
205        cur.execute("select 1 as a, 2 as b")
206        res = cur.fetchone()
207        self.assertIsInstance(res, dict)
208        self.assertEqual(res, {'a': 1, 'b': 2})
209        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
210        res = cur.fetchall()
211        self.assertIsInstance(res, list)
212        self.assertEqual(len(res), 2)
213        self.assertIsInstance(res[0], dict)
214        self.assertEqual(res[0], {'a': 1, 'b': 2})
215        self.assertIsInstance(res[1], dict)
216        self.assertEqual(res[1], {'a': 3, 'b': 4})
217
218    def test_cursor_with_named_columns(self):
219        con = self._connect()
220        cur = con.cursor()
221        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
222        self.assertIs(res, cur, 'execute() should return cursor')
223        res = cur.fetchone()
224        self.assertIsInstance(res, tuple)
225        self.assertEqual(res, (1, 2, 3))
226        self.assertEqual(res._fields, ('abc', 'de', 'f'))
227        self.assertEqual(res.abc, 1)
228        self.assertEqual(res.de, 2)
229        self.assertEqual(res.f, 3)
230        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
231        res = cur.fetchall()
232        self.assertIsInstance(res, list)
233        self.assertEqual(len(res), 2)
234        self.assertIsInstance(res[0], tuple)
235        self.assertEqual(res[0], (1, 2))
236        self.assertEqual(res[0]._fields, ('one', 'two'))
237        self.assertIsInstance(res[1], tuple)
238        self.assertEqual(res[1], (3, 4))
239        self.assertEqual(res[1]._fields, ('one', 'two'))
240
241    def test_cursor_with_unnamed_columns(self):
242        con = self._connect()
243        cur = con.cursor()
244        cur.execute("select 1, 2, 3")
245        res = cur.fetchone()
246        self.assertIsInstance(res, tuple)
247        self.assertEqual(res, (1, 2, 3))
248        old_py = OrderedDict is None  # Python 2.6 or 3.0
249        # old Python versions cannot rename tuple fields with underscore
250        if old_py:
251            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
252        else:
253            self.assertEqual(res._fields, ('_0', '_1', '_2'))
254        cur.execute("select 1 as one, 2, 3 as three")
255        res = cur.fetchone()
256        self.assertIsInstance(res, tuple)
257        self.assertEqual(res, (1, 2, 3))
258        if old_py:  # cannot auto rename with underscore
259            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
260        else:
261            self.assertEqual(res._fields, ('one', '_1', 'three'))
262        cur.execute("select 1 as abc, 2 as def")
263        res = cur.fetchone()
264        self.assertIsInstance(res, tuple)
265        self.assertEqual(res, (1, 2))
266        if old_py:
267            self.assertEqual(res._fields, ('column_0', 'column_1'))
268        else:
269            self.assertEqual(res._fields, ('abc', '_1'))
270
271    def test_colnames(self):
272        con = self._connect()
273        cur = con.cursor()
274        cur.execute("select 1, 2, 3")
275        names = cur.colnames
276        self.assertIsInstance(names, list)
277        self.assertEqual(names, ['?column?', '?column?', '?column?'])
278        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
279        names = cur.colnames
280        self.assertIsInstance(names, list)
281        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
282
283    def test_coltypes(self):
284        con = self._connect()
285        cur = con.cursor()
286        cur.execute("select 1::int2, 2::int4, 3::int8")
287        types = cur.coltypes
288        self.assertIsInstance(types, list)
289        self.assertEqual(types, ['int2', 'int4', 'int8'])
290
291    def test_description_fields(self):
292        con = self._connect()
293        cur = con.cursor()
294        cur.execute("select 123456789::int8 col0,"
295            " 123456.789::numeric(41, 13) as col1,"
296            " 'foobar'::char(39) as col2")
297        desc = cur.description
298        self.assertIsInstance(desc, list)
299        self.assertEqual(len(desc), 3)
300        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
301        for i in range(3):
302            c, d = cols[i], desc[i]
303            self.assertIsInstance(d, tuple)
304            self.assertEqual(len(d), 7)
305            self.assertIsInstance(d.name, str)
306            self.assertEqual(d.name, 'col%d' % i)
307            self.assertIsInstance(d.type_code, str)
308            self.assertEqual(d.type_code, c[0])
309            self.assertIsNone(d.display_size)
310            self.assertIsInstance(d.internal_size, int)
311            self.assertEqual(d.internal_size, c[1])
312            if c[2] is not None:
313                self.assertIsInstance(d.precision, int)
314                self.assertEqual(d.precision, c[1])
315                self.assertIsInstance(d.scale, int)
316                self.assertEqual(d.scale, c[2])
317            else:
318                self.assertIsNone(d.precision)
319                self.assertIsNone(d.scale)
320            self.assertIsNone(d.null_ok)
321
322    def test_type_cache_info(self):
323        con = self._connect()
324        try:
325            cur = con.cursor()
326            type_cache = con.type_cache
327            self.assertNotIn('numeric', type_cache)
328            type_info = type_cache['numeric']
329            self.assertIn('numeric', type_cache)
330            self.assertEqual(type_info, 'numeric')
331            self.assertEqual(type_info.oid, 1700)
332            self.assertEqual(type_info.len, -1)
333            self.assertEqual(type_info.type, 'b')  # base
334            self.assertEqual(type_info.category, 'N')  # numeric
335            self.assertEqual(type_info.delim, ',')
336            self.assertEqual(type_info.relid, 0)
337            self.assertIs(con.type_cache[1700], type_info)
338            self.assertNotIn('pg_type', type_cache)
339            type_info = type_cache['pg_type']
340            self.assertIn('pg_type', type_cache)
341            self.assertEqual(type_info.type, 'c')  # composite
342            self.assertEqual(type_info.category, 'C')  # composite
343            cols = type_cache.get_fields('pg_type')
344            self.assertEqual(cols[0].name, 'typname')
345            typname = type_cache[cols[0].type]
346            self.assertEqual(typname, 'name')
347            self.assertEqual(typname.type, 'b')  # base
348            self.assertEqual(typname.category, 'S')  # string
349            self.assertEqual(cols[3].name, 'typlen')
350            typlen = type_cache[cols[3].type]
351            self.assertEqual(typlen, 'int2')
352            self.assertEqual(typlen.type, 'b')  # base
353            self.assertEqual(typlen.category, 'N')  # numeric
354            cur.close()
355            cur = con.cursor()
356            type_cache = con.type_cache
357            self.assertIn('numeric', type_cache)
358            cur.close()
359        finally:
360            con.close()
361        con = self._connect()
362        try:
363            cur = con.cursor()
364            type_cache = con.type_cache
365            self.assertNotIn('pg_type', type_cache)
366            self.assertEqual(type_cache.get('pg_type'), type_info)
367            self.assertIn('pg_type', type_cache)
368            self.assertIsNone(type_cache.get(
369                self.table_prefix + '_surely_does_not_exist'))
370            cur.close()
371        finally:
372            con.close()
373
374    def test_type_cache_typecast(self):
375        con = self._connect()
376        try:
377            cur = con.cursor()
378            type_cache = con.type_cache
379            self.assertIs(type_cache.get_typecast('int4'), int)
380            cast_int = lambda v: 'int(%s)' % v
381            type_cache.set_typecast('int4', cast_int)
382            query = 'select 2::int2, 4::int4, 8::int8'
383            cur.execute(query)
384            i2, i4, i8 = cur.fetchone()
385            self.assertEqual(i2, 2)
386            self.assertEqual(i4, 'int(4)')
387            self.assertEqual(i8, 8)
388            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
389            type_cache.set_typecast(['int2', 'int8'], cast_int)
390            cur.execute(query)
391            i2, i4, i8 = cur.fetchone()
392            self.assertEqual(i2, 'int(2)')
393            self.assertEqual(i4, 'int(4)')
394            self.assertEqual(i8, 'int(8)')
395            type_cache.reset_typecast('int4')
396            cur.execute(query)
397            i2, i4, i8 = cur.fetchone()
398            self.assertEqual(i2, 'int(2)')
399            self.assertEqual(i4, 4)
400            self.assertEqual(i8, 'int(8)')
401            type_cache.reset_typecast(['int2', 'int8'])
402            cur.execute(query)
403            i2, i4, i8 = cur.fetchone()
404            self.assertEqual(i2, 2)
405            self.assertEqual(i4, 4)
406            self.assertEqual(i8, 8)
407            type_cache.set_typecast(['int2', 'int8'], cast_int)
408            cur.execute(query)
409            i2, i4, i8 = cur.fetchone()
410            self.assertEqual(i2, 'int(2)')
411            self.assertEqual(i4, 4)
412            self.assertEqual(i8, 'int(8)')
413            type_cache.reset_typecast()
414            cur.execute(query)
415            i2, i4, i8 = cur.fetchone()
416            self.assertEqual(i2, 2)
417            self.assertEqual(i4, 4)
418            self.assertEqual(i8, 8)
419            cur.close()
420        finally:
421            con.close()
422
423    def test_cursor_iteration(self):
424        con = self._connect()
425        cur = con.cursor()
426        cur.execute("select 1 union select 2 union select 3")
427        self.assertEqual([r[0] for r in cur], [1, 2, 3])
428
429    def test_cursor_invalidation(self):
430        con = self._connect()
431        cur = con.cursor()
432        cur.execute("select 1 union select 2")
433        self.assertEqual(cur.fetchone(), (1,))
434        self.assertFalse(con.closed)
435        con.close()
436        self.assertTrue(con.closed)
437        self.assertRaises(pgdb.OperationalError, cur.fetchone)
438
439    def test_fetch_2_rows(self):
440        Decimal = pgdb.decimal_type()
441        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
442            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
443            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
444            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
445            pgdb.Interval(15, 31, 5), 7897234)
446        table = self.table_prefix + 'booze'
447        con = self._connect()
448        try:
449            cur = con.cursor()
450            cur.execute("set datestyle to iso")
451            cur.execute("create table %s ("
452                "stringtest varchar,"
453                "binarytest bytea,"
454                "booltest bool,"
455                "integertest int4,"
456                "longtest int8,"
457                "floattest float8,"
458                "numerictest numeric,"
459                "moneytest money,"
460                "datetest date,"
461                "timetest time,"
462                "datetimetest timestamp,"
463                "intervaltest interval,"
464                "rowidtest oid)" % table)
465            cur.execute("set standard_conforming_strings to on")
466            for s in ('numeric', 'monetary', 'time'):
467                cur.execute("set lc_%s to 'C'" % s)
468            for _i in range(2):
469                cur.execute("insert into %s values ("
470                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
471                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
472            cur.execute("select * from %s" % table)
473            rows = cur.fetchall()
474            self.assertEqual(len(rows), 2)
475            row0 = rows[0]
476            self.assertEqual(row0, values)
477            self.assertEqual(row0, rows[1])
478            self.assertIsInstance(row0[0], str)
479            self.assertIsInstance(row0[1], bytes)
480            self.assertIsInstance(row0[2], bool)
481            self.assertIsInstance(row0[3], int)
482            self.assertIsInstance(row0[4], long)
483            self.assertIsInstance(row0[5], float)
484            self.assertIsInstance(row0[6], Decimal)
485            self.assertIsInstance(row0[7], Decimal)
486            self.assertIsInstance(row0[8], date)
487            self.assertIsInstance(row0[9], time)
488            self.assertIsInstance(row0[10], datetime)
489            self.assertIsInstance(row0[11], timedelta)
490        finally:
491            con.close()
492
493    def test_integrity_error(self):
494        table = self.table_prefix + 'booze'
495        con = self._connect()
496        try:
497            cur = con.cursor()
498            cur.execute("set client_min_messages = warning")
499            cur.execute("create table %s (i int primary key)" % table)
500            cur.execute("insert into %s values (1)" % table)
501            cur.execute("insert into %s values (2)" % table)
502            self.assertRaises(pgdb.IntegrityError, cur.execute,
503                "insert into %s values (1)" % table)
504        finally:
505            con.close()
506
507    def test_update_rowcount(self):
508        table = self.table_prefix + 'booze'
509        con = self._connect()
510        try:
511            cur = con.cursor()
512            cur.execute("create table %s (i int)" % table)
513            cur.execute("insert into %s values (1)" % table)
514            cur.execute("update %s set i=2 where i=2 returning i" % table)
515            self.assertEqual(cur.rowcount, 0)
516            cur.execute("update %s set i=2 where i=1 returning i" % table)
517            self.assertEqual(cur.rowcount, 1)
518            cur.close()
519            # keep rowcount even if cursor is closed (needed by SQLAlchemy)
520            self.assertEqual(cur.rowcount, 1)
521        finally:
522            con.close()
523
524    def test_sqlstate(self):
525        con = self._connect()
526        cur = con.cursor()
527        try:
528            cur.execute("select 1/0")
529        except pgdb.DatabaseError as error:
530            self.assertTrue(isinstance(error, pgdb.DataError))
531            # the SQLSTATE error code for division by zero is 22012
532            self.assertEqual(error.sqlstate, '22012')
533
534    def test_float(self):
535        nan, inf = float('nan'), float('inf')
536        from math import isnan, isinf
537        self.assertTrue(isnan(nan) and not isinf(nan))
538        self.assertTrue(isinf(inf) and not isnan(inf))
539        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
540            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
541        table = self.table_prefix + 'booze'
542        con = self._connect()
543        try:
544            cur = con.cursor()
545            cur.execute(
546                "create table %s (n smallint, floattest float)" % table)
547            params = enumerate(values)
548            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
549            cur.execute("select floattest from %s order by n" % table)
550            rows = cur.fetchall()
551            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
552            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
553            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
554        finally:
555            con.close()
556        self.assertEqual(len(rows), len(values))
557        rows = [row[0] for row in rows]
558        for inval, outval in zip(values, rows):
559            if inval in ('inf', 'Infinity'):
560                inval = inf
561            elif inval in ('-inf', '-Infinity'):
562                inval = -inf
563            elif inval in ('nan', 'NaN'):
564                inval = nan
565            if isinf(inval):
566                self.assertTrue(isinf(outval))
567                if inval < 0:
568                    self.assertTrue(outval < 0)
569                else:
570                    self.assertTrue(outval > 0)
571            elif isnan(inval):
572                self.assertTrue(isnan(outval))
573            else:
574                self.assertEqual(inval, outval)
575
576    def test_datetime(self):
577        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
578        table = self.table_prefix + 'booze'
579        con = self._connect()
580        try:
581            cur = con.cursor()
582            cur.execute("set timezone = UTC")
583            cur.execute("create table %s ("
584                "d date, t time,  ts timestamp,"
585                "tz timetz, tsz timestamptz)" % table)
586            for n in range(3):
587                values = [dt.date(), dt.time(), dt,
588                    dt.time(), dt]
589                values[3] = values[3].replace(tzinfo=pgdb.timezone.utc)
590                values[4] = values[4].replace(tzinfo=pgdb.timezone.utc)
591                if n == 0:  # input as objects
592                    params = values
593                if n == 1:  # input as text
594                    params = [v.isoformat() for v in values]  # as text
595                elif n == 2:  # input using type helpers
596                    d = (dt.year, dt.month, dt.day)
597                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
598                    z = (pgdb.timezone.utc,)
599                    params = [pgdb.Date(*d), pgdb.Time(*t),
600                            pgdb.Timestamp(*(d + t)), pgdb.Time(*(t + z)),
601                            pgdb.Timestamp(*(d + t + z))]
602                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
603                        'sql, mdy', 'sql, dmy', 'german'):
604                    cur.execute("set datestyle to %s" % datestyle)
605                    if n != 1:
606                        cur.execute("select %s,%s,%s,%s,%s", params)
607                        row = cur.fetchone()
608                        self.assertEqual(row, tuple(values))
609                    cur.execute("insert into %s"
610                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
611                    cur.execute("select * from %s" % table)
612                    d = cur.description
613                    for i in range(5):
614                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
615                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
616                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
617                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
618                    self.assertEqual(d[0].type_code, pgdb.DATE)
619                    self.assertEqual(d[1].type_code, pgdb.TIME)
620                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
621                    self.assertEqual(d[3].type_code, pgdb.TIME)
622                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
623                    row = cur.fetchone()
624                    self.assertEqual(row, tuple(values))
625                    cur.execute("delete from %s" % table)
626        finally:
627            con.close()
628
629    def test_interval(self):
630        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
631        table = self.table_prefix + 'booze'
632        con = self._connect()
633        try:
634            cur = con.cursor()
635            cur.execute("create table %s (i interval)" % table)
636            for n in range(3):
637                if n == 0:  # input as objects
638                    param = td
639                if n == 1:  # input as text
640                    param = '%d days %d seconds %d microseconds ' % (
641                        td.days, td.seconds, td.microseconds)
642                elif n == 2:  # input using type helpers
643                    param = pgdb.Interval(
644                        td.days, 0, 0, td.seconds, td.microseconds)
645                for intervalstyle in ('sql_standard ', 'postgres',
646                        'postgres_verbose', 'iso_8601'):
647                    cur.execute("set intervalstyle to %s" % intervalstyle)
648                    cur.execute("insert into %s"
649                        " values (%%s)" % table, [param])
650                    cur.execute("select * from %s" % table)
651                    tc = cur.description[0].type_code
652                    self.assertEqual(tc, pgdb.DATETIME)
653                    self.assertNotEqual(tc, pgdb.STRING)
654                    self.assertNotEqual(tc, pgdb.ARRAY)
655                    self.assertNotEqual(tc, pgdb.RECORD)
656                    self.assertEqual(tc, pgdb.INTERVAL)
657                    row = cur.fetchone()
658                    self.assertEqual(row, (td,))
659                    cur.execute("delete from %s" % table)
660        finally:
661            con.close()
662
663    def test_hstore(self):
664        con = self._connect()
665        try:
666            cur = con.cursor()
667            cur.execute("select 'k=>v'::hstore")
668        except pgdb.DatabaseError:
669            try:
670                cur.execute("create extension hstore")
671            except pgdb.DatabaseError:
672                self.skipTest("hstore extension not enabled")
673        finally:
674            con.close()
675        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever', 'back\\': '\\slash',
676            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
677            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
678            'None': None, 'NULL': 'NULL', 'empty': ''}
679        con = self._connect()
680        try:
681            cur = con.cursor()
682            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
683            result = cur.fetchone()[0]
684        finally:
685            con.close()
686        self.assertIsInstance(result, dict)
687        self.assertEqual(result, d)
688
689    def test_uuid(self):
690        self.assertIs(Uuid, pgdb.Uuid)
691        d = Uuid('{12345678-1234-5678-1234-567812345678}')
692        con = self._connect()
693        try:
694            cur = con.cursor()
695            cur.execute("select %s::uuid", (d,))
696            result = cur.fetchone()[0]
697        finally:
698            con.close()
699        self.assertIsInstance(result, Uuid)
700        self.assertEqual(result, d)
701
702    def test_insert_array(self):
703        values = [(None, None), ([], []), ([None], [[None], ['null']]),
704            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
705            ([20000, 25000, 25000, 30000],
706            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
707            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
708        table = self.table_prefix + 'booze'
709        con = self._connect()
710        try:
711            cur = con.cursor()
712            cur.execute("create table %s"
713                " (n smallint, i int[], t text[][])" % table)
714            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
715            # Note that we must explicit casts because we are inserting
716            # empty arrays.  Otherwise this is not necessary.
717            cur.executemany("insert into %s values"
718                " (%%d,%%s::int[],%%s::text[][])" % table, params)
719            cur.execute("select i, t from %s order by n" % table)
720            d = cur.description
721            self.assertEqual(d[0].type_code, pgdb.ARRAY)
722            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
723            self.assertEqual(d[0].type_code, pgdb.NUMBER)
724            self.assertEqual(d[0].type_code, pgdb.INTEGER)
725            self.assertEqual(d[1].type_code, pgdb.ARRAY)
726            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
727            self.assertEqual(d[1].type_code, pgdb.STRING)
728            rows = cur.fetchall()
729        finally:
730            con.close()
731        self.assertEqual(rows, values)
732
733    def test_select_array(self):
734        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
735        con = self._connect()
736        try:
737            cur = con.cursor()
738            cur.execute("select %s::int[], %s::text[]", values)
739            row = cur.fetchone()
740        finally:
741            con.close()
742        self.assertEqual(row, values)
743
744    def test_unicode_list_and_tuple(self):
745        value = (u'KÀse', u'WÃŒrstchen')
746        con = self._connect()
747        try:
748            cur = con.cursor()
749            try:
750                cur.execute("select %s, %s", value)
751            except pgdb.DatabaseError:
752                self.skipTest('database does not support latin-1')
753            row = cur.fetchone()
754            cur.execute("select %s, %s", (list(value), tuple(value)))
755            as_list, as_tuple = cur.fetchone()
756        finally:
757            con.close()
758        self.assertEqual(as_list, list(row))
759        self.assertEqual(as_tuple, tuple(row))
760
761    def test_insert_record(self):
762        values = [('John', 61), ('Jane', 63),
763                  ('Fred', None), ('Wilma', None),
764                  (None, 42), (None, None)]
765        table = self.table_prefix + 'booze'
766        record = self.table_prefix + 'munch'
767        con = self._connect()
768        try:
769            cur = con.cursor()
770            cur.execute("create type %s as (name varchar, age int)" % record)
771            cur.execute("create table %s (n smallint, r %s)" % (table, record))
772            params = enumerate(values)
773            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
774            cur.execute("select r from %s order by n" % table)
775            type_code = cur.description[0].type_code
776            self.assertEqual(type_code, record)
777            self.assertEqual(type_code, pgdb.RECORD)
778            self.assertNotEqual(type_code, pgdb.ARRAY)
779            columns = con.type_cache.get_fields(type_code)
780            self.assertEqual(columns[0].name, 'name')
781            self.assertEqual(columns[1].name, 'age')
782            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
783            self.assertEqual(con.type_cache[columns[1].type], 'int4')
784            rows = cur.fetchall()
785        finally:
786            cur.execute('drop table %s' % table)
787            cur.execute('drop type %s' % record)
788            con.close()
789        self.assertEqual(len(rows), len(values))
790        rows = [row[0] for row in rows]
791        self.assertEqual(rows, values)
792        self.assertEqual(rows[0].name, 'John')
793        self.assertEqual(rows[0].age, 61)
794
795    def test_select_record(self):
796        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
797            '(test)', '(x,y)', ' x y ', 'null', None)
798        con = self._connect()
799        try:
800            cur = con.cursor()
801            cur.execute("select %s as test_record", [value])
802            self.assertEqual(cur.description[0].name, 'test_record')
803            self.assertEqual(cur.description[0].type_code, 'record')
804            row = cur.fetchone()[0]
805        finally:
806            con.close()
807        # Note that the element types get lost since we created an
808        # untyped record (an anonymous composite type). For the same
809        # reason this is also a normal tuple, not a named tuple.
810        text_row = tuple(None if v is None else str(v) for v in value)
811        self.assertEqual(row, text_row)
812
813    def test_custom_type(self):
814        values = [3, 5, 65]
815        values = list(map(PgBitString, values))
816        table = self.table_prefix + 'booze'
817        con = self._connect()
818        try:
819            cur = con.cursor()
820            params = enumerate(values)  # params have __pg_repr__ method
821            cur.execute(
822                'create table "%s" (n smallint, b bit varying(7))' % table)
823            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
824            cur.execute("select * from %s" % table)
825            rows = cur.fetchall()
826        finally:
827            con.close()
828        self.assertEqual(len(rows), len(values))
829        con = self._connect()
830        try:
831            cur = con.cursor()
832            params = (1, object())  # an object that cannot be handled
833            self.assertRaises(pgdb.InterfaceError, cur.execute,
834                "insert into %s values (%%s,%%s)" % table, params)
835        finally:
836            con.close()
837
838    def test_set_decimal_type(self):
839        decimal_type = pgdb.decimal_type()
840        self.assertTrue(decimal_type is not None and callable(decimal_type))
841        con = self._connect()
842        try:
843            cur = con.cursor()
844            # change decimal type globally to int
845            int_type = lambda v: int(float(v))
846            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
847            cur.execute('select 4.25')
848            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
849            value = cur.fetchone()[0]
850            self.assertTrue(isinstance(value, int))
851            self.assertEqual(value, 4)
852            # change decimal type again to float
853            self.assertTrue(pgdb.decimal_type(float) is float)
854            cur.execute('select 4.25')
855            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
856            value = cur.fetchone()[0]
857            # the connection still uses the old setting
858            self.assertTrue(isinstance(value, int))
859            # bust the cache for type functions for the connection
860            con.type_cache.reset_typecast()
861            cur.execute('select 4.25')
862            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
863            value = cur.fetchone()[0]
864            # now the connection uses the new setting
865            self.assertTrue(isinstance(value, float))
866            self.assertEqual(value, 4.25)
867        finally:
868            con.close()
869            pgdb.decimal_type(decimal_type)
870        self.assertTrue(pgdb.decimal_type() is decimal_type)
871
872    def test_global_typecast(self):
873        try:
874            query = 'select 2::int2, 4::int4, 8::int8'
875            self.assertIs(pgdb.get_typecast('int4'), int)
876            cast_int = lambda v: 'int(%s)' % v
877            pgdb.set_typecast('int4', cast_int)
878            con = self._connect()
879            try:
880                i2, i4, i8 = con.cursor().execute(query).fetchone()
881            finally:
882                con.close()
883            self.assertEqual(i2, 2)
884            self.assertEqual(i4, 'int(4)')
885            self.assertEqual(i8, 8)
886            pgdb.set_typecast(['int2', 'int8'], cast_int)
887            con = self._connect()
888            try:
889                i2, i4, i8 = con.cursor().execute(query).fetchone()
890            finally:
891                con.close()
892            self.assertEqual(i2, 'int(2)')
893            self.assertEqual(i4, 'int(4)')
894            self.assertEqual(i8, 'int(8)')
895            pgdb.reset_typecast('int4')
896            con = self._connect()
897            try:
898                i2, i4, i8 = con.cursor().execute(query).fetchone()
899            finally:
900                con.close()
901            self.assertEqual(i2, 'int(2)')
902            self.assertEqual(i4, 4)
903            self.assertEqual(i8, 'int(8)')
904            pgdb.reset_typecast(['int2', 'int8'])
905            con = self._connect()
906            try:
907                i2, i4, i8 = con.cursor().execute(query).fetchone()
908            finally:
909                con.close()
910            self.assertEqual(i2, 2)
911            self.assertEqual(i4, 4)
912            self.assertEqual(i8, 8)
913            pgdb.set_typecast(['int2', 'int8'], cast_int)
914            con = self._connect()
915            try:
916                i2, i4, i8 = con.cursor().execute(query).fetchone()
917            finally:
918                con.close()
919            self.assertEqual(i2, 'int(2)')
920            self.assertEqual(i4, 4)
921            self.assertEqual(i8, 'int(8)')
922        finally:
923            pgdb.reset_typecast()
924        con = self._connect()
925        try:
926            i2, i4, i8 = con.cursor().execute(query).fetchone()
927        finally:
928            con.close()
929        self.assertEqual(i2, 2)
930        self.assertEqual(i4, 4)
931        self.assertEqual(i8, 8)
932
933    def test_set_typecast_for_arrays(self):
934        query = 'select ARRAY[1,2,3]'
935        try:
936            con = self._connect()
937            try:
938                r = con.cursor().execute(query).fetchone()[0]
939            finally:
940                con.close()
941            self.assertIsInstance(r, list)
942            self.assertEqual(r, [1, 2, 3])
943            pgdb.set_typecast('anyarray', lambda v, basecast: v)
944            con = self._connect()
945            try:
946                r = con.cursor().execute(query).fetchone()[0]
947            finally:
948                con.close()
949            self.assertIsInstance(r, str)
950            self.assertEqual(r, '{1,2,3}')
951        finally:
952            pgdb.reset_typecast()
953        con = self._connect()
954        try:
955            r = con.cursor().execute(query).fetchone()[0]
956        finally:
957            con.close()
958        self.assertIsInstance(r, list)
959        self.assertEqual(r, [1, 2, 3])
960
961    def test_unicode_with_utf8(self):
962        table = self.table_prefix + 'booze'
963        input = u"He wes Leovenaðes sone — liðe him be Drihten"
964        con = self._connect()
965        try:
966            cur = con.cursor()
967            cur.execute("create table %s (t text)" % table)
968            try:
969                cur.execute("set client_encoding=utf8")
970                cur.execute(u"select '%s'" % input)
971            except Exception:
972                self.skipTest("database does not support utf8")
973            output1 = cur.fetchone()[0]
974            cur.execute("insert into %s values (%%s)" % table, (input,))
975            cur.execute("select * from %s" % table)
976            output2 = cur.fetchone()[0]
977            cur.execute("select t = '%s' from %s" % (input, table))
978            output3 = cur.fetchone()[0]
979            cur.execute("select t = %%s from %s" % table, (input,))
980            output4 = cur.fetchone()[0]
981        finally:
982            con.close()
983        if str is bytes:  # Python < 3.0
984            input = input.encode('utf8')
985        self.assertIsInstance(output1, str)
986        self.assertEqual(output1, input)
987        self.assertIsInstance(output2, str)
988        self.assertEqual(output2, input)
989        self.assertIsInstance(output3, bool)
990        self.assertTrue(output3)
991        self.assertIsInstance(output4, bool)
992        self.assertTrue(output4)
993
994    def test_unicode_with_latin1(self):
995        table = self.table_prefix + 'booze'
996        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
997        con = self._connect()
998        try:
999            cur = con.cursor()
1000            cur.execute("create table %s (t text)" % table)
1001            try:
1002                cur.execute("set client_encoding=latin1")
1003                cur.execute(u"select '%s'" % input)
1004            except Exception:
1005                self.skipTest("database does not support latin1")
1006            output1 = cur.fetchone()[0]
1007            cur.execute("insert into %s values (%%s)" % table, (input,))
1008            cur.execute("select * from %s" % table)
1009            output2 = cur.fetchone()[0]
1010            cur.execute("select t = '%s' from %s" % (input, table))
1011            output3 = cur.fetchone()[0]
1012            cur.execute("select t = %%s from %s" % table, (input,))
1013            output4 = cur.fetchone()[0]
1014        finally:
1015            con.close()
1016        if str is bytes:  # Python < 3.0
1017            input = input.encode('latin1')
1018        self.assertIsInstance(output1, str)
1019        self.assertEqual(output1, input)
1020        self.assertIsInstance(output2, str)
1021        self.assertEqual(output2, input)
1022        self.assertIsInstance(output3, bool)
1023        self.assertTrue(output3)
1024        self.assertIsInstance(output4, bool)
1025        self.assertTrue(output4)
1026
1027    def test_bool(self):
1028        values = [False, True, None, 't', 'f', 'true', 'false']
1029        table = self.table_prefix + 'booze'
1030        con = self._connect()
1031        try:
1032            cur = con.cursor()
1033            cur.execute(
1034                "create table %s (n smallint, booltest bool)" % table)
1035            params = enumerate(values)
1036            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
1037            cur.execute("select booltest from %s order by n" % table)
1038            rows = cur.fetchall()
1039            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
1040        finally:
1041            con.close()
1042        rows = [row[0] for row in rows]
1043        values[3] = values[5] = True
1044        values[4] = values[6] = False
1045        self.assertEqual(rows, values)
1046
1047    def test_literal(self):
1048        con = self._connect()
1049        try:
1050            cur = con.cursor()
1051            value = "lower('Hello')"
1052            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
1053            row = cur.fetchone()
1054        finally:
1055            con.close()
1056        self.assertEqual(row, (value, 'hello'))
1057
1058    def test_json(self):
1059        inval = {"employees":
1060            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1061        table = self.table_prefix + 'booze'
1062        con = self._connect()
1063        try:
1064            cur = con.cursor()
1065            try:
1066                cur.execute("create table %s (jsontest json)" % table)
1067            except pgdb.ProgrammingError:
1068                self.skipTest('database does not support json')
1069            params = (pgdb.Json(inval),)
1070            cur.execute("insert into %s values (%%s)" % table, params)
1071            cur.execute("select jsontest from %s" % table)
1072            outval = cur.fetchone()[0]
1073            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1074        finally:
1075            con.close()
1076        self.assertEqual(inval, outval)
1077
1078    def test_jsonb(self):
1079        inval = {"employees":
1080            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1081        table = self.table_prefix + 'booze'
1082        con = self._connect()
1083        try:
1084            cur = con.cursor()
1085            try:
1086                cur.execute("create table %s (jsonbtest jsonb)" % table)
1087            except pgdb.ProgrammingError:
1088                self.skipTest('database does not support jsonb')
1089            params = (pgdb.Json(inval),)
1090            cur.execute("insert into %s values (%%s)" % table, params)
1091            cur.execute("select jsonbtest from %s" % table)
1092            outval = cur.fetchone()[0]
1093            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1094        finally:
1095            con.close()
1096        self.assertEqual(inval, outval)
1097
1098    def test_execute_edge_cases(self):
1099        con = self._connect()
1100        try:
1101            cur = con.cursor()
1102            sql = 'invalid'  # should be ignored with empty parameter list
1103            cur.executemany(sql, [])
1104            sql = 'select %d + 1'
1105            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
1106            self.assertEqual(cur.fetchone()[0], 3)
1107            sql = 'select 1/0'  # cannot be executed
1108            self.assertRaises(pgdb.DataError, cur.execute, sql)
1109            cur.close()
1110            con.rollback()
1111            if pgdb.shortcutmethods:
1112                res = con.execute('select %d', (1,)).fetchone()
1113                self.assertEqual(res, (1,))
1114                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1115                self.assertEqual(res, (2,))
1116        finally:
1117            con.close()
1118        sql = 'select 1'  # cannot be executed after connection is closed
1119        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1120
1121    def test_fetchmany_with_keep(self):
1122        con = self._connect()
1123        try:
1124            cur = con.cursor()
1125            self.assertEqual(cur.arraysize, 1)
1126            cur.execute('select * from generate_series(1, 25)')
1127            self.assertEqual(len(cur.fetchmany()), 1)
1128            self.assertEqual(len(cur.fetchmany()), 1)
1129            self.assertEqual(cur.arraysize, 1)
1130            cur.arraysize = 3
1131            self.assertEqual(len(cur.fetchmany()), 3)
1132            self.assertEqual(len(cur.fetchmany()), 3)
1133            self.assertEqual(cur.arraysize, 3)
1134            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1135            self.assertEqual(cur.arraysize, 3)
1136            self.assertEqual(len(cur.fetchmany()), 3)
1137            self.assertEqual(len(cur.fetchmany()), 3)
1138            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1139            self.assertEqual(cur.arraysize, 2)
1140            self.assertEqual(len(cur.fetchmany()), 2)
1141            self.assertEqual(len(cur.fetchmany()), 2)
1142            self.assertEqual(len(cur.fetchmany(25)), 3)
1143        finally:
1144            con.close()
1145
1146    def test_nextset(self):
1147        con = self._connect()
1148        cur = con.cursor()
1149        self.assertRaises(con.NotSupportedError, cur.nextset)
1150
1151    def test_setoutputsize(self):
1152        pass  # not supported
1153
1154    def test_connection_errors(self):
1155        con = self._connect()
1156        self.assertEqual(con.Error, pgdb.Error)
1157        self.assertEqual(con.Warning, pgdb.Warning)
1158        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1159        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1160        self.assertEqual(con.InternalError, pgdb.InternalError)
1161        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1162        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1163        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1164        self.assertEqual(con.DataError, pgdb.DataError)
1165        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1166
1167    def test_connection_as_contextmanager(self):
1168        table = self.table_prefix + 'booze'
1169        con = self._connect()
1170        try:
1171            cur = con.cursor()
1172            cur.execute("create table %s (n smallint check(n!=4))" % table)
1173            with con:
1174                cur.execute("insert into %s values (1)" % table)
1175                cur.execute("insert into %s values (2)" % table)
1176            try:
1177                with con:
1178                    cur.execute("insert into %s values (3)" % table)
1179                    cur.execute("insert into %s values (4)" % table)
1180            except con.IntegrityError as error:
1181                self.assertTrue('check' in str(error).lower())
1182            with con:
1183                cur.execute("insert into %s values (5)" % table)
1184                cur.execute("insert into %s values (6)" % table)
1185            try:
1186                with con:
1187                    cur.execute("insert into %s values (7)" % table)
1188                    cur.execute("insert into %s values (8)" % table)
1189                    raise ValueError('transaction should rollback')
1190            except ValueError as error:
1191                self.assertEqual(str(error), 'transaction should rollback')
1192            with con:
1193                cur.execute("insert into %s values (9)" % table)
1194            cur.execute("select * from %s order by 1" % table)
1195            rows = cur.fetchall()
1196            rows = [row[0] for row in rows]
1197        finally:
1198            con.close()
1199        self.assertEqual(rows, [1, 2, 5, 6, 9])
1200
1201    def test_cursor_connection(self):
1202        con = self._connect()
1203        cur = con.cursor()
1204        self.assertEqual(cur.connection, con)
1205        cur.close()
1206
1207    def test_cursor_as_contextmanager(self):
1208        con = self._connect()
1209        with con.cursor() as cur:
1210            self.assertEqual(cur.connection, con)
1211
1212    def test_pgdb_type(self):
1213        self.assertEqual(pgdb.STRING, pgdb.STRING)
1214        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1215        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1216        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1217        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1218        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1219        self.assertEqual('char', pgdb.STRING)
1220        self.assertEqual('varchar', pgdb.STRING)
1221        self.assertEqual('text', pgdb.STRING)
1222        self.assertNotEqual('numeric', pgdb.STRING)
1223        self.assertEqual('numeric', pgdb.NUMERIC)
1224        self.assertEqual('numeric', pgdb.NUMBER)
1225        self.assertEqual('int4', pgdb.NUMBER)
1226        self.assertNotEqual('int4', pgdb.NUMERIC)
1227        self.assertEqual('int2', pgdb.SMALLINT)
1228        self.assertNotEqual('int4', pgdb.SMALLINT)
1229        self.assertEqual('int2', pgdb.INTEGER)
1230        self.assertEqual('int4', pgdb.INTEGER)
1231        self.assertEqual('int8', pgdb.INTEGER)
1232        self.assertNotEqual('int4', pgdb.LONG)
1233        self.assertEqual('int8', pgdb.LONG)
1234        self.assertTrue('char' in pgdb.STRING)
1235        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1236        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1237        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1238        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1239        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1240        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1241        self.assertEqual('_char', pgdb.ARRAY)
1242        self.assertNotEqual('char', pgdb.ARRAY)
1243        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1244        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1245        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1246        self.assertEqual('record', pgdb.RECORD)
1247        self.assertNotEqual('_record', pgdb.RECORD)
1248
1249    def test_no_close(self):
1250        data = ('hello', 'world')
1251        con = self._connect()
1252        cur = con.cursor()
1253        cur.build_row_factory = lambda: tuple
1254        cur.execute("select %s, %s", data)
1255        row = cur.fetchone()
1256        self.assertEqual(row, data)
1257
1258    def test_set_row_factory_size(self):
1259        try:
1260            from functools import lru_cache
1261        except ImportError:  # Python < 3.2
1262            lru_cache = None
1263        queries = ['select 1 as a, 2 as b, 3 as c', 'select 123 as abc']
1264        con = self._connect()
1265        cur = con.cursor()
1266        for maxsize in (None, 0, 1, 2, 3, 10, 1024):
1267            pgdb.set_row_factory_size(maxsize)
1268            for i in range(3):
1269                for q in queries:
1270                    cur.execute(q)
1271                    r = cur.fetchone()
1272                    if q.endswith('abc'):
1273                        self.assertEqual(r, (123,))
1274                        self.assertEqual(r._fields, ('abc',))
1275                    else:
1276                        self.assertEqual(r, (1, 2, 3))
1277                        self.assertEqual(r._fields, ('a', 'b', 'c'))
1278            if lru_cache:
1279                info = pgdb._row_factory.cache_info()
1280                self.assertEqual(info.maxsize, maxsize)
1281                self.assertEqual(info.hits + info.misses, 6)
1282                self.assertEqual(info.hits,
1283                    0 if maxsize is not None and maxsize < 2 else 4)
1284
1285    def test_memory_leaks(self):
1286        ids = set()
1287        objs = []
1288        add_ids = ids.update
1289        gc.collect()
1290        objs[:] = gc.get_objects()
1291        add_ids(id(obj) for obj in objs)
1292        self.test_no_close()
1293        gc.collect()
1294        objs[:] = gc.get_objects()
1295        objs[:] = [obj for obj in objs if id(obj) not in ids]
1296        if objs and sys.version_info[:3] in ((3, 5, 0), (3, 5, 1)):
1297            # workaround for Python issue 26811
1298            objs[:] = [obj for obj in objs if repr(obj) != '(<NULL>,)']
1299        self.assertEqual(len(objs), 0)
1300
1301
1302if __name__ == '__main__':
1303    unittest.main()
Note: See TracBrowser for help on using the repository browser.