source: trunk/tests/test_dbapi20.py @ 840

Last change on this file since 840 was 840, checked in by cito, 4 years ago

Improve adaptation of hstore and empty array

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 45.6 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 840 2016-02-08 17:32:50Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import date, time, datetime, timedelta
32from uuid import UUID
33
34try:
35    from datetime import timezone
36except ImportError:  # Python < 3.2
37    timezone = None
38
39try:
40    long
41except NameError:  # Python >= 3.0
42    long = int
43
44try:
45    from collections import OrderedDict
46except ImportError:  # Python 2.6 or 3.0
47    OrderedDict = None
48
49
50class PgBitString:
51    """Test object with a PostgreSQL representation as Bit String."""
52
53    def __init__(self, value):
54        self.value = value
55
56    def __pg_repr__(self):
57         return "B'{0:b}'".format(self.value)
58
59
60class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
61
62    driver = pgdb
63    connect_args = ()
64    connect_kw_args = {'database': dbname,
65        'host': '%s:%d' % (dbhost or '', dbport or -1)}
66
67    lower_func = 'lower'  # For stored procedure test
68
69    def setUp(self):
70        # Call superclass setUp in case this does something in the future
71        dbapi20.DatabaseAPI20Test.setUp(self)
72        try:
73            con = self._connect()
74            con.close()
75        except pgdb.Error:  # try to create a missing database
76            import pg
77            try:  # first try to log in as superuser
78                db = pg.DB('postgres', dbhost or None, dbport or -1,
79                    user='postgres')
80            except Exception:  # then try to log in as current user
81                db = pg.DB('postgres', dbhost or None, dbport or -1)
82            db.query('create database ' + dbname)
83
84    def tearDown(self):
85        dbapi20.DatabaseAPI20Test.tearDown(self)
86
87    def testVersion(self):
88        v = pgdb.version
89        self.assertIsInstance(v, str)
90        self.assertIn('.', v)
91        self.assertEqual(pgdb.__version__, v)
92
93    def test_callproc_no_params(self):
94        con = self._connect()
95        cur = con.cursor()
96        # note that now() does not change within a transaction
97        cur.execute('select now()')
98        now = cur.fetchone()[0]
99        res = cur.callproc('now')
100        self.assertIsNone(res)
101        res = cur.fetchone()[0]
102        self.assertEqual(res, now)
103
104    def test_callproc_bad_params(self):
105        con = self._connect()
106        cur = con.cursor()
107        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
108        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
109
110    def test_callproc_one_param(self):
111        con = self._connect()
112        cur = con.cursor()
113        params = (42.4382,)
114        res = cur.callproc("round", params)
115        self.assertIs(res, params)
116        res = cur.fetchone()[0]
117        self.assertEqual(res, 42)
118
119    def test_callproc_two_params(self):
120        con = self._connect()
121        cur = con.cursor()
122        params = (9, 4)
123        res = cur.callproc("div", params)
124        self.assertIs(res, params)
125        res = cur.fetchone()[0]
126        self.assertEqual(res, 2)
127
128    def test_cursor_type(self):
129
130        class TestCursor(pgdb.Cursor):
131            pass
132
133        con = self._connect()
134        self.assertIs(con.cursor_type, pgdb.Cursor)
135        cur = con.cursor()
136        self.assertIsInstance(cur, pgdb.Cursor)
137        self.assertNotIsInstance(cur, TestCursor)
138        con.cursor_type = TestCursor
139        cur = con.cursor()
140        self.assertIsInstance(cur, TestCursor)
141        cur = con.cursor()
142        self.assertIsInstance(cur, TestCursor)
143        con = self._connect()
144        self.assertIs(con.cursor_type, pgdb.Cursor)
145        cur = con.cursor()
146        self.assertIsInstance(cur, pgdb.Cursor)
147        self.assertNotIsInstance(cur, TestCursor)
148
149    def test_row_factory(self):
150
151        class TestCursor(pgdb.Cursor):
152
153            def row_factory(self, row):
154                return dict(('column %s' % desc[0], value)
155                    for desc, value in zip(self.description, row))
156
157        con = self._connect()
158        con.cursor_type = TestCursor
159        cur = con.cursor()
160        self.assertIsInstance(cur, TestCursor)
161        res = cur.execute("select 1 as a, 2 as b")
162        self.assertIs(res, cur, 'execute() should return cursor')
163        res = cur.fetchone()
164        self.assertIsInstance(res, dict)
165        self.assertEqual(res, {'column a': 1, 'column b': 2})
166        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
167        res = cur.fetchall()
168        self.assertIsInstance(res, list)
169        self.assertEqual(len(res), 2)
170        self.assertIsInstance(res[0], dict)
171        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
172        self.assertIsInstance(res[1], dict)
173        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
174
175    def test_build_row_factory(self):
176
177        class TestCursor(pgdb.Cursor):
178
179            def build_row_factory(self):
180                keys = [desc[0] for desc in self.description]
181                return lambda row: dict((key, value)
182                    for key, value in zip(keys, row))
183
184        con = self._connect()
185        con.cursor_type = TestCursor
186        cur = con.cursor()
187        self.assertIsInstance(cur, TestCursor)
188        cur.execute("select 1 as a, 2 as b")
189        res = cur.fetchone()
190        self.assertIsInstance(res, dict)
191        self.assertEqual(res, {'a': 1, 'b': 2})
192        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
193        res = cur.fetchall()
194        self.assertIsInstance(res, list)
195        self.assertEqual(len(res), 2)
196        self.assertIsInstance(res[0], dict)
197        self.assertEqual(res[0], {'a': 1, 'b': 2})
198        self.assertIsInstance(res[1], dict)
199        self.assertEqual(res[1], {'a': 3, 'b': 4})
200
201    def test_cursor_with_named_columns(self):
202        con = self._connect()
203        cur = con.cursor()
204        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
205        self.assertIs(res, cur, 'execute() should return cursor')
206        res = cur.fetchone()
207        self.assertIsInstance(res, tuple)
208        self.assertEqual(res, (1, 2, 3))
209        self.assertEqual(res._fields, ('abc', 'de', 'f'))
210        self.assertEqual(res.abc, 1)
211        self.assertEqual(res.de, 2)
212        self.assertEqual(res.f, 3)
213        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
214        res = cur.fetchall()
215        self.assertIsInstance(res, list)
216        self.assertEqual(len(res), 2)
217        self.assertIsInstance(res[0], tuple)
218        self.assertEqual(res[0], (1, 2))
219        self.assertEqual(res[0]._fields, ('one', 'two'))
220        self.assertIsInstance(res[1], tuple)
221        self.assertEqual(res[1], (3, 4))
222        self.assertEqual(res[1]._fields, ('one', 'two'))
223
224    def test_cursor_with_unnamed_columns(self):
225        con = self._connect()
226        cur = con.cursor()
227        cur.execute("select 1, 2, 3")
228        res = cur.fetchone()
229        self.assertIsInstance(res, tuple)
230        self.assertEqual(res, (1, 2, 3))
231        old_py = OrderedDict is None  # Python 2.6 or 3.0
232        # old Python versions cannot rename tuple fields with underscore
233        if old_py:
234            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
235        else:
236            self.assertEqual(res._fields, ('_0', '_1', '_2'))
237        cur.execute("select 1 as one, 2, 3 as three")
238        res = cur.fetchone()
239        self.assertIsInstance(res, tuple)
240        self.assertEqual(res, (1, 2, 3))
241        if old_py:  # cannot auto rename with underscore
242            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
243        else:
244            self.assertEqual(res._fields, ('one', '_1', 'three'))
245        cur.execute("select 1 as abc, 2 as def")
246        res = cur.fetchone()
247        self.assertIsInstance(res, tuple)
248        self.assertEqual(res, (1, 2))
249        if old_py:
250            self.assertEqual(res._fields, ('column_0', 'column_1'))
251        else:
252            self.assertEqual(res._fields, ('abc', '_1'))
253
254    def test_colnames(self):
255        con = self._connect()
256        cur = con.cursor()
257        cur.execute("select 1, 2, 3")
258        names = cur.colnames
259        self.assertIsInstance(names, list)
260        self.assertEqual(names, ['?column?', '?column?', '?column?'])
261        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
262        names = cur.colnames
263        self.assertIsInstance(names, list)
264        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
265
266    def test_coltypes(self):
267        con = self._connect()
268        cur = con.cursor()
269        cur.execute("select 1::int2, 2::int4, 3::int8")
270        types = cur.coltypes
271        self.assertIsInstance(types, list)
272        self.assertEqual(types, ['int2', 'int4', 'int8'])
273
274    def test_description_fields(self):
275        con = self._connect()
276        cur = con.cursor()
277        cur.execute("select 123456789::int8 col0,"
278            " 123456.789::numeric(41, 13) as col1,"
279            " 'foobar'::char(39) as col2")
280        desc = cur.description
281        self.assertIsInstance(desc, list)
282        self.assertEqual(len(desc), 3)
283        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
284        for i in range(3):
285            c, d = cols[i], desc[i]
286            self.assertIsInstance(d, tuple)
287            self.assertEqual(len(d), 7)
288            self.assertIsInstance(d.name, str)
289            self.assertEqual(d.name, 'col%d' % i)
290            self.assertIsInstance(d.type_code, str)
291            self.assertEqual(d.type_code, c[0])
292            self.assertIsNone(d.display_size)
293            self.assertIsInstance(d.internal_size, int)
294            self.assertEqual(d.internal_size, c[1])
295            if c[2] is not None:
296                self.assertIsInstance(d.precision, int)
297                self.assertEqual(d.precision, c[1])
298                self.assertIsInstance(d.scale, int)
299                self.assertEqual(d.scale, c[2])
300            else:
301                self.assertIsNone(d.precision)
302                self.assertIsNone(d.scale)
303            self.assertIsNone(d.null_ok)
304
305    def test_type_cache_info(self):
306        con = self._connect()
307        try:
308            cur = con.cursor()
309            type_cache = con.type_cache
310            self.assertNotIn('numeric', type_cache)
311            type_info = type_cache['numeric']
312            self.assertIn('numeric', type_cache)
313            self.assertEqual(type_info, 'numeric')
314            self.assertEqual(type_info.oid, 1700)
315            self.assertEqual(type_info.len, -1)
316            self.assertEqual(type_info.type, 'b')  # base
317            self.assertEqual(type_info.category, 'N')  # numeric
318            self.assertEqual(type_info.delim, ',')
319            self.assertEqual(type_info.relid, 0)
320            self.assertIs(con.type_cache[1700], type_info)
321            self.assertNotIn('pg_type', type_cache)
322            type_info = type_cache['pg_type']
323            self.assertIn('pg_type', type_cache)
324            self.assertEqual(type_info.type, 'c')  # composite
325            self.assertEqual(type_info.category, 'C')  # composite
326            cols = type_cache.get_fields('pg_type')
327            self.assertEqual(cols[0].name, 'typname')
328            typname = type_cache[cols[0].type]
329            self.assertEqual(typname, 'name')
330            self.assertEqual(typname.type, 'b')  # base
331            self.assertEqual(typname.category, 'S')  # string
332            self.assertEqual(cols[3].name, 'typlen')
333            typlen = type_cache[cols[3].type]
334            self.assertEqual(typlen, 'int2')
335            self.assertEqual(typlen.type, 'b')  # base
336            self.assertEqual(typlen.category, 'N')  # numeric
337            cur.close()
338            cur = con.cursor()
339            type_cache = con.type_cache
340            self.assertIn('numeric', type_cache)
341            cur.close()
342        finally:
343            con.close()
344        con = self._connect()
345        try:
346            cur = con.cursor()
347            type_cache = con.type_cache
348            self.assertNotIn('pg_type', type_cache)
349            self.assertEqual(type_cache.get('pg_type'), type_info)
350            self.assertIn('pg_type', type_cache)
351            self.assertIsNone(type_cache.get(
352                self.table_prefix + '_surely_does_not_exist'))
353            cur.close()
354        finally:
355            con.close()
356
357    def test_type_cache_typecast(self):
358        con = self._connect()
359        try:
360            cur = con.cursor()
361            type_cache = con.type_cache
362            self.assertIs(type_cache.get_typecast('int4'), int)
363            cast_int = lambda v: 'int(%s)' % v
364            type_cache.set_typecast('int4', cast_int)
365            query = 'select 2::int2, 4::int4, 8::int8'
366            cur.execute(query)
367            i2, i4, i8 = cur.fetchone()
368            self.assertEqual(i2, 2)
369            self.assertEqual(i4, 'int(4)')
370            self.assertEqual(i8, 8)
371            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
372            type_cache.set_typecast(['int2', 'int8'], cast_int)
373            cur.execute(query)
374            i2, i4, i8 = cur.fetchone()
375            self.assertEqual(i2, 'int(2)')
376            self.assertEqual(i4, 'int(4)')
377            self.assertEqual(i8, 'int(8)')
378            type_cache.reset_typecast('int4')
379            cur.execute(query)
380            i2, i4, i8 = cur.fetchone()
381            self.assertEqual(i2, 'int(2)')
382            self.assertEqual(i4, 4)
383            self.assertEqual(i8, 'int(8)')
384            type_cache.reset_typecast(['int2', 'int8'])
385            cur.execute(query)
386            i2, i4, i8 = cur.fetchone()
387            self.assertEqual(i2, 2)
388            self.assertEqual(i4, 4)
389            self.assertEqual(i8, 8)
390            type_cache.set_typecast(['int2', 'int8'], cast_int)
391            cur.execute(query)
392            i2, i4, i8 = cur.fetchone()
393            self.assertEqual(i2, 'int(2)')
394            self.assertEqual(i4, 4)
395            self.assertEqual(i8, 'int(8)')
396            type_cache.reset_typecast()
397            cur.execute(query)
398            i2, i4, i8 = cur.fetchone()
399            self.assertEqual(i2, 2)
400            self.assertEqual(i4, 4)
401            self.assertEqual(i8, 8)
402            cur.close()
403        finally:
404            con.close()
405
406    def test_cursor_iteration(self):
407        con = self._connect()
408        cur = con.cursor()
409        cur.execute("select 1 union select 2 union select 3")
410        self.assertEqual([r[0] for r in cur], [1, 2, 3])
411
412    def test_fetch_2_rows(self):
413        Decimal = pgdb.decimal_type()
414        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
415            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
416            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
417            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
418            pgdb.Interval(15, 31, 5), 7897234)
419        table = self.table_prefix + 'booze'
420        con = self._connect()
421        try:
422            cur = con.cursor()
423            cur.execute("set datestyle to iso")
424            cur.execute("create table %s ("
425                "stringtest varchar,"
426                "binarytest bytea,"
427                "booltest bool,"
428                "integertest int4,"
429                "longtest int8,"
430                "floattest float8,"
431                "numerictest numeric,"
432                "moneytest money,"
433                "datetest date,"
434                "timetest time,"
435                "datetimetest timestamp,"
436                "intervaltest interval,"
437                "rowidtest oid)" % table)
438            cur.execute("set standard_conforming_strings to on")
439            for s in ('numeric', 'monetary', 'time'):
440                cur.execute("set lc_%s to 'C'" % s)
441            for _i in range(2):
442                cur.execute("insert into %s values ("
443                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
444                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
445            cur.execute("select * from %s" % table)
446            rows = cur.fetchall()
447            self.assertEqual(len(rows), 2)
448            row0 = rows[0]
449            self.assertEqual(row0, values)
450            self.assertEqual(row0, rows[1])
451            self.assertIsInstance(row0[0], str)
452            self.assertIsInstance(row0[1], bytes)
453            self.assertIsInstance(row0[2], bool)
454            self.assertIsInstance(row0[3], int)
455            self.assertIsInstance(row0[4], long)
456            self.assertIsInstance(row0[5], float)
457            self.assertIsInstance(row0[6], Decimal)
458            self.assertIsInstance(row0[7], Decimal)
459            self.assertIsInstance(row0[8], date)
460            self.assertIsInstance(row0[9], time)
461            self.assertIsInstance(row0[10], datetime)
462            self.assertIsInstance(row0[11], timedelta)
463        finally:
464            con.close()
465
466    def test_integrity_error(self):
467        table = self.table_prefix + 'booze'
468        con = self._connect()
469        try:
470            cur = con.cursor()
471            cur.execute("set client_min_messages = warning")
472            cur.execute("create table %s (i int primary key)" % table)
473            cur.execute("insert into %s values (1)" % table)
474            cur.execute("insert into %s values (2)" % table)
475            self.assertRaises(pgdb.IntegrityError, cur.execute,
476                "insert into %s values (1)" % table)
477        finally:
478            con.close()
479
480    def test_sqlstate(self):
481        con = self._connect()
482        cur = con.cursor()
483        try:
484            cur.execute("select 1/0")
485        except pgdb.DatabaseError as error:
486            self.assertTrue(isinstance(error, pgdb.DataError))
487            # the SQLSTATE error code for division by zero is 22012
488            self.assertEqual(error.sqlstate, '22012')
489
490    def test_float(self):
491        nan, inf = float('nan'), float('inf')
492        from math import isnan, isinf
493        self.assertTrue(isnan(nan) and not isinf(nan))
494        self.assertTrue(isinf(inf) and not isnan(inf))
495        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
496            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
497        table = self.table_prefix + 'booze'
498        con = self._connect()
499        try:
500            cur = con.cursor()
501            cur.execute(
502                "create table %s (n smallint, floattest float)" % table)
503            params = enumerate(values)
504            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
505            cur.execute("select floattest from %s order by n" % table)
506            rows = cur.fetchall()
507            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
508            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
509            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
510        finally:
511            con.close()
512        self.assertEqual(len(rows), len(values))
513        rows = [row[0] for row in rows]
514        for inval, outval in zip(values, rows):
515            if inval in ('inf', 'Infinity'):
516                inval = inf
517            elif inval in ('-inf', '-Infinity'):
518                inval = -inf
519            elif inval in ('nan', 'NaN'):
520                inval = nan
521            if isinf(inval):
522                self.assertTrue(isinf(outval))
523                if inval < 0:
524                    self.assertTrue(outval < 0)
525                else:
526                    self.assertTrue(outval > 0)
527            elif isnan(inval):
528                self.assertTrue(isnan(outval))
529            else:
530                self.assertEqual(inval, outval)
531
532    def test_datetime(self):
533        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
534        table = self.table_prefix + 'booze'
535        con = self._connect()
536        try:
537            cur = con.cursor()
538            cur.execute("create table %s ("
539                "d date, t time,  ts timestamp,"
540                "tz timetz, tsz timestamptz)" % table)
541            for n in range(3):
542                values = [dt.date(), dt.time(), dt,
543                    dt.time(), dt]
544                if timezone:
545                    values[3] = values[3].replace(tzinfo=timezone.utc)
546                    values[4] = values[4].replace(tzinfo=timezone.utc)
547                if n == 0:  # input as objects
548                    params = values
549                if n == 1:  # input as text
550                    params = [v.isoformat() for v in values]  # as text
551                elif n == 2:  # input using type helpers
552                    d = (dt.year, dt.month, dt.day)
553                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
554                    params = [pgdb.Date(*d), pgdb.Time(*t),
555                            pgdb.Timestamp(*(d + t)), pgdb.Time(*t),
556                            pgdb.Timestamp(*(d + t))]
557                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
558                        'sql, mdy', 'sql, dmy', 'german'):
559                    cur.execute("set datestyle to %s" % datestyle)
560                    if n != 1:
561                        cur.execute("select %s,%s,%s,%s,%s", params)
562                        row = cur.fetchone()
563                        self.assertEqual(row, tuple(values))
564                    cur.execute("insert into %s"
565                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
566                    cur.execute("select * from %s" % table)
567                    d = cur.description
568                    for i in range(5):
569                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
570                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
571                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
572                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
573                    self.assertEqual(d[0].type_code, pgdb.DATE)
574                    self.assertEqual(d[1].type_code, pgdb.TIME)
575                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
576                    self.assertEqual(d[3].type_code, pgdb.TIME)
577                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
578                    row = cur.fetchone()
579                    self.assertEqual(row, tuple(values))
580                    cur.execute("delete from %s" % table)
581        finally:
582            con.close()
583
584    def test_interval(self):
585        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
586        table = self.table_prefix + 'booze'
587        con = self._connect()
588        try:
589            cur = con.cursor()
590            cur.execute("create table %s (i interval)" % table)
591            for n in range(3):
592                if n == 0:  # input as objects
593                    param = td
594                if n == 1:  # input as text
595                    param = '%d days %d seconds %d microseconds ' % (
596                        td.days, td.seconds, td.microseconds)
597                elif n == 2:  # input using type helpers
598                    param = pgdb.Interval(
599                        td.days, 0, 0, td.seconds, td.microseconds)
600                for intervalstyle in ('sql_standard ', 'postgres',
601                        'postgres_verbose', 'iso_8601'):
602                    cur.execute("set intervalstyle to %s" % intervalstyle)
603                    cur.execute("insert into %s"
604                        " values (%%s)" % table, [param])
605                    cur.execute("select * from %s" % table)
606                    tc = cur.description[0].type_code
607                    self.assertEqual(tc, pgdb.DATETIME)
608                    self.assertNotEqual(tc, pgdb.STRING)
609                    self.assertNotEqual(tc, pgdb.ARRAY)
610                    self.assertNotEqual(tc, pgdb.RECORD)
611                    self.assertEqual(tc, pgdb.INTERVAL)
612                    row = cur.fetchone()
613                    self.assertEqual(row, (td,))
614                    cur.execute("delete from %s" % table)
615        finally:
616            con.close()
617
618    def test_hstore(self):
619        con = self._connect()
620        try:
621            cur = con.cursor()
622            cur.execute("select 'k=>v'::hstore")
623        except pgdb.DatabaseError:
624            try:
625                cur.execute("create extension hstore")
626            except pgdb.DatabaseError:
627                self.skipTest("hstore extension not enabled")
628        finally:
629            con.close()
630        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever', 'back\\': '\\slash',
631            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
632            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
633            'None': None, 'NULL': 'NULL', 'empty': ''}
634        con = self._connect()
635        try:
636            cur = con.cursor()
637            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
638            result = cur.fetchone()[0]
639        finally:
640            con.close()
641        self.assertIsInstance(result, dict)
642        self.assertEqual(result, d)
643
644    def test_uuid(self):
645        d = UUID('{12345678-1234-5678-1234-567812345678}')
646        con = self._connect()
647        try:
648            cur = con.cursor()
649            cur.execute("select %s::uuid", (d,))
650            result = cur.fetchone()[0]
651        finally:
652            con.close()
653        self.assertIsInstance(result, UUID)
654        self.assertEqual(result, d)
655
656    def test_insert_array(self):
657        values = [(None, None), ([], []), ([None], [[None], ['null']]),
658            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
659            ([20000, 25000, 25000, 30000],
660            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
661            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
662        table = self.table_prefix + 'booze'
663        con = self._connect()
664        try:
665            cur = con.cursor()
666            cur.execute("create table %s"
667                " (n smallint, i int[], t text[][])" % table)
668            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
669            # Note that we must explicit casts because we are inserting
670            # empty arrays.  Otherwise this is not necessary.
671            cur.executemany("insert into %s values"
672                " (%%d,%%s::int[],%%s::text[][])" % table, params)
673            cur.execute("select i, t from %s order by n" % table)
674            d = cur.description
675            self.assertEqual(d[0].type_code, pgdb.ARRAY)
676            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
677            self.assertEqual(d[0].type_code, pgdb.NUMBER)
678            self.assertEqual(d[0].type_code, pgdb.INTEGER)
679            self.assertEqual(d[1].type_code, pgdb.ARRAY)
680            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
681            self.assertEqual(d[1].type_code, pgdb.STRING)
682            rows = cur.fetchall()
683        finally:
684            con.close()
685        self.assertEqual(rows, values)
686
687    def test_select_array(self):
688        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
689        con = self._connect()
690        try:
691            cur = con.cursor()
692            cur.execute("select %s::int[], %s::text[]", values)
693            row = cur.fetchone()
694        finally:
695            con.close()
696        self.assertEqual(row, values)
697
698    def test_insert_record(self):
699        values = [('John', 61), ('Jane', 63),
700                  ('Fred', None), ('Wilma', None),
701                  (None, 42), (None, None)]
702        table = self.table_prefix + 'booze'
703        record = self.table_prefix + 'munch'
704        con = self._connect()
705        try:
706            cur = con.cursor()
707            cur.execute("create type %s as (name varchar, age int)" % record)
708            cur.execute("create table %s (n smallint, r %s)" % (table, record))
709            params = enumerate(values)
710            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
711            cur.execute("select r from %s order by n" % table)
712            type_code = cur.description[0].type_code
713            self.assertEqual(type_code, record)
714            self.assertEqual(type_code, pgdb.RECORD)
715            self.assertNotEqual(type_code, pgdb.ARRAY)
716            columns = con.type_cache.get_fields(type_code)
717            self.assertEqual(columns[0].name, 'name')
718            self.assertEqual(columns[1].name, 'age')
719            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
720            self.assertEqual(con.type_cache[columns[1].type], 'int4')
721            rows = cur.fetchall()
722        finally:
723            cur.execute('drop table %s' % table)
724            cur.execute('drop type %s' % record)
725            con.close()
726        self.assertEqual(len(rows), len(values))
727        rows = [row[0] for row in rows]
728        self.assertEqual(rows, values)
729        self.assertEqual(rows[0].name, 'John')
730        self.assertEqual(rows[0].age, 61)
731
732    def test_select_record(self):
733        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
734            '(test)', '(x,y)', ' x y ', 'null', None)
735        con = self._connect()
736        try:
737            cur = con.cursor()
738            cur.execute("select %s as test_record", [value])
739            self.assertEqual(cur.description[0].name, 'test_record')
740            self.assertEqual(cur.description[0].type_code, 'record')
741            row = cur.fetchone()[0]
742        finally:
743            con.close()
744        # Note that the element types get lost since we created an
745        # untyped record (an anonymous composite type). For the same
746        # reason this is also a normal tuple, not a named tuple.
747        text_row = tuple(None if v is None else str(v) for v in value)
748        self.assertEqual(row, text_row)
749
750    def test_custom_type(self):
751        values = [3, 5, 65]
752        values = list(map(PgBitString, values))
753        table = self.table_prefix + 'booze'
754        con = self._connect()
755        try:
756            cur = con.cursor()
757            params = enumerate(values)  # params have __pg_repr__ method
758            cur.execute(
759                'create table "%s" (n smallint, b bit varying(7))' % table)
760            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
761            cur.execute("select * from %s" % table)
762            rows = cur.fetchall()
763        finally:
764            con.close()
765        self.assertEqual(len(rows), len(values))
766        con = self._connect()
767        try:
768            cur = con.cursor()
769            params = (1, object())  # an object that cannot be handled
770            self.assertRaises(pgdb.InterfaceError, cur.execute,
771                "insert into %s values (%%s,%%s)" % table, params)
772        finally:
773            con.close()
774
775    def test_set_decimal_type(self):
776        decimal_type = pgdb.decimal_type()
777        self.assertTrue(decimal_type is not None and callable(decimal_type))
778        con = self._connect()
779        try:
780            cur = con.cursor()
781            # change decimal type globally to int
782            int_type = lambda v: int(float(v))
783            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
784            cur.execute('select 4.25')
785            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
786            value = cur.fetchone()[0]
787            self.assertTrue(isinstance(value, int))
788            self.assertEqual(value, 4)
789            # change decimal type again to float
790            self.assertTrue(pgdb.decimal_type(float) is float)
791            cur.execute('select 4.25')
792            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
793            value = cur.fetchone()[0]
794            # the connection still uses the old setting
795            self.assertTrue(isinstance(value, int))
796            # bust the cache for type functions for the connection
797            con.type_cache.reset_typecast()
798            cur.execute('select 4.25')
799            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
800            value = cur.fetchone()[0]
801            # now the connection uses the new setting
802            self.assertTrue(isinstance(value, float))
803            self.assertEqual(value, 4.25)
804        finally:
805            con.close()
806            pgdb.decimal_type(decimal_type)
807        self.assertTrue(pgdb.decimal_type() is decimal_type)
808
809    def test_global_typecast(self):
810        try:
811            query = 'select 2::int2, 4::int4, 8::int8'
812            self.assertIs(pgdb.get_typecast('int4'), int)
813            cast_int = lambda v: 'int(%s)' % v
814            pgdb.set_typecast('int4', cast_int)
815            con = self._connect()
816            try:
817                i2, i4, i8 = con.cursor().execute(query).fetchone()
818            finally:
819                con.close()
820            self.assertEqual(i2, 2)
821            self.assertEqual(i4, 'int(4)')
822            self.assertEqual(i8, 8)
823            pgdb.set_typecast(['int2', 'int8'], cast_int)
824            con = self._connect()
825            try:
826                i2, i4, i8 = con.cursor().execute(query).fetchone()
827            finally:
828                con.close()
829            self.assertEqual(i2, 'int(2)')
830            self.assertEqual(i4, 'int(4)')
831            self.assertEqual(i8, 'int(8)')
832            pgdb.reset_typecast('int4')
833            con = self._connect()
834            try:
835                i2, i4, i8 = con.cursor().execute(query).fetchone()
836            finally:
837                con.close()
838            self.assertEqual(i2, 'int(2)')
839            self.assertEqual(i4, 4)
840            self.assertEqual(i8, 'int(8)')
841            pgdb.reset_typecast(['int2', 'int8'])
842            con = self._connect()
843            try:
844                i2, i4, i8 = con.cursor().execute(query).fetchone()
845            finally:
846                con.close()
847            self.assertEqual(i2, 2)
848            self.assertEqual(i4, 4)
849            self.assertEqual(i8, 8)
850            pgdb.set_typecast(['int2', 'int8'], cast_int)
851            con = self._connect()
852            try:
853                i2, i4, i8 = con.cursor().execute(query).fetchone()
854            finally:
855                con.close()
856            self.assertEqual(i2, 'int(2)')
857            self.assertEqual(i4, 4)
858            self.assertEqual(i8, 'int(8)')
859        finally:
860            pgdb.reset_typecast()
861        con = self._connect()
862        try:
863            i2, i4, i8 = con.cursor().execute(query).fetchone()
864        finally:
865            con.close()
866        self.assertEqual(i2, 2)
867        self.assertEqual(i4, 4)
868        self.assertEqual(i8, 8)
869
870    def test_unicode_with_utf8(self):
871        table = self.table_prefix + 'booze'
872        input = u"He wes Leovenaðes sone — liðe him be Drihten"
873        con = self._connect()
874        try:
875            cur = con.cursor()
876            cur.execute("create table %s (t text)" % table)
877            try:
878                cur.execute("set client_encoding=utf8")
879                cur.execute(u"select '%s'" % input)
880            except Exception:
881                self.skipTest("database does not support utf8")
882            output1 = cur.fetchone()[0]
883            cur.execute("insert into %s values (%%s)" % table, (input,))
884            cur.execute("select * from %s" % table)
885            output2 = cur.fetchone()[0]
886            cur.execute("select t = '%s' from %s" % (input, table))
887            output3 = cur.fetchone()[0]
888            cur.execute("select t = %%s from %s" % table, (input,))
889            output4 = cur.fetchone()[0]
890        finally:
891            con.close()
892        if str is bytes:  # Python < 3.0
893            input = input.encode('utf8')
894        self.assertIsInstance(output1, str)
895        self.assertEqual(output1, input)
896        self.assertIsInstance(output2, str)
897        self.assertEqual(output2, input)
898        self.assertIsInstance(output3, bool)
899        self.assertTrue(output3)
900        self.assertIsInstance(output4, bool)
901        self.assertTrue(output4)
902
903    def test_unicode_with_latin1(self):
904        table = self.table_prefix + 'booze'
905        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
906        con = self._connect()
907        try:
908            cur = con.cursor()
909            cur.execute("create table %s (t text)" % table)
910            try:
911                cur.execute("set client_encoding=latin1")
912                cur.execute(u"select '%s'" % input)
913            except Exception:
914                self.skipTest("database does not support latin1")
915            output1 = cur.fetchone()[0]
916            cur.execute("insert into %s values (%%s)" % table, (input,))
917            cur.execute("select * from %s" % table)
918            output2 = cur.fetchone()[0]
919            cur.execute("select t = '%s' from %s" % (input, table))
920            output3 = cur.fetchone()[0]
921            cur.execute("select t = %%s from %s" % table, (input,))
922            output4 = cur.fetchone()[0]
923        finally:
924            con.close()
925        if str is bytes:  # Python < 3.0
926            input = input.encode('latin1')
927        self.assertIsInstance(output1, str)
928        self.assertEqual(output1, input)
929        self.assertIsInstance(output2, str)
930        self.assertEqual(output2, input)
931        self.assertIsInstance(output3, bool)
932        self.assertTrue(output3)
933        self.assertIsInstance(output4, bool)
934        self.assertTrue(output4)
935
936    def test_bool(self):
937        values = [False, True, None, 't', 'f', 'true', 'false']
938        table = self.table_prefix + 'booze'
939        con = self._connect()
940        try:
941            cur = con.cursor()
942            cur.execute(
943                "create table %s (n smallint, booltest bool)" % table)
944            params = enumerate(values)
945            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
946            cur.execute("select booltest from %s order by n" % table)
947            rows = cur.fetchall()
948            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
949        finally:
950            con.close()
951        rows = [row[0] for row in rows]
952        values[3] = values[5] = True
953        values[4] = values[6] = False
954        self.assertEqual(rows, values)
955
956    def test_literal(self):
957        con = self._connect()
958        try:
959            cur = con.cursor()
960            value = "lower('Hello')"
961            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
962            row = cur.fetchone()
963        finally:
964            con.close()
965        self.assertEqual(row, (value, 'hello'))
966
967
968    def test_json(self):
969        inval = {"employees":
970            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
971        table = self.table_prefix + 'booze'
972        con = self._connect()
973        try:
974            cur = con.cursor()
975            try:
976                cur.execute("create table %s (jsontest json)" % table)
977            except pgdb.ProgrammingError:
978                self.skipTest('database does not support json')
979            params = (pgdb.Json(inval),)
980            cur.execute("insert into %s values (%%s)" % table, params)
981            cur.execute("select jsontest from %s" % table)
982            outval = cur.fetchone()[0]
983            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
984        finally:
985            con.close()
986        self.assertEqual(inval, outval)
987
988    def test_jsonb(self):
989        inval = {"employees":
990            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
991        table = self.table_prefix + 'booze'
992        con = self._connect()
993        try:
994            cur = con.cursor()
995            try:
996                cur.execute("create table %s (jsonbtest jsonb)" % table)
997            except pgdb.ProgrammingError:
998                self.skipTest('database does not support jsonb')
999            params = (pgdb.Json(inval),)
1000            cur.execute("insert into %s values (%%s)" % table, params)
1001            cur.execute("select jsonbtest from %s" % table)
1002            outval = cur.fetchone()[0]
1003            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1004        finally:
1005            con.close()
1006        self.assertEqual(inval, outval)
1007
1008    def test_execute_edge_cases(self):
1009        con = self._connect()
1010        try:
1011            cur = con.cursor()
1012            sql = 'invalid'  # should be ignored with empty parameter list
1013            cur.executemany(sql, [])
1014            sql = 'select %d + 1'
1015            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
1016            self.assertEqual(cur.fetchone()[0], 3)
1017            sql = 'select 1/0'  # cannot be executed
1018            self.assertRaises(pgdb.DataError, cur.execute, sql)
1019            cur.close()
1020            con.rollback()
1021            if pgdb.shortcutmethods:
1022                res = con.execute('select %d', (1,)).fetchone()
1023                self.assertEqual(res, (1,))
1024                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1025                self.assertEqual(res, (2,))
1026        finally:
1027            con.close()
1028        sql = 'select 1'  # cannot be executed after connection is closed
1029        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1030
1031    def test_fetchmany_with_keep(self):
1032        con = self._connect()
1033        try:
1034            cur = con.cursor()
1035            self.assertEqual(cur.arraysize, 1)
1036            cur.execute('select * from generate_series(1, 25)')
1037            self.assertEqual(len(cur.fetchmany()), 1)
1038            self.assertEqual(len(cur.fetchmany()), 1)
1039            self.assertEqual(cur.arraysize, 1)
1040            cur.arraysize = 3
1041            self.assertEqual(len(cur.fetchmany()), 3)
1042            self.assertEqual(len(cur.fetchmany()), 3)
1043            self.assertEqual(cur.arraysize, 3)
1044            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1045            self.assertEqual(cur.arraysize, 3)
1046            self.assertEqual(len(cur.fetchmany()), 3)
1047            self.assertEqual(len(cur.fetchmany()), 3)
1048            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1049            self.assertEqual(cur.arraysize, 2)
1050            self.assertEqual(len(cur.fetchmany()), 2)
1051            self.assertEqual(len(cur.fetchmany()), 2)
1052            self.assertEqual(len(cur.fetchmany(25)), 3)
1053        finally:
1054            con.close()
1055
1056    def test_nextset(self):
1057        con = self._connect()
1058        cur = con.cursor()
1059        self.assertRaises(con.NotSupportedError, cur.nextset)
1060
1061    def test_setoutputsize(self):
1062        pass  # not supported
1063
1064    def test_connection_errors(self):
1065        con = self._connect()
1066        self.assertEqual(con.Error, pgdb.Error)
1067        self.assertEqual(con.Warning, pgdb.Warning)
1068        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1069        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1070        self.assertEqual(con.InternalError, pgdb.InternalError)
1071        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1072        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1073        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1074        self.assertEqual(con.DataError, pgdb.DataError)
1075        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1076
1077    def test_connection_as_contextmanager(self):
1078        table = self.table_prefix + 'booze'
1079        con = self._connect()
1080        try:
1081            cur = con.cursor()
1082            cur.execute("create table %s (n smallint check(n!=4))" % table)
1083            with con:
1084                cur.execute("insert into %s values (1)" % table)
1085                cur.execute("insert into %s values (2)" % table)
1086            try:
1087                with con:
1088                    cur.execute("insert into %s values (3)" % table)
1089                    cur.execute("insert into %s values (4)" % table)
1090            except con.IntegrityError as error:
1091                self.assertTrue('check' in str(error).lower())
1092            with con:
1093                cur.execute("insert into %s values (5)" % table)
1094                cur.execute("insert into %s values (6)" % table)
1095            try:
1096                with con:
1097                    cur.execute("insert into %s values (7)" % table)
1098                    cur.execute("insert into %s values (8)" % table)
1099                    raise ValueError('transaction should rollback')
1100            except ValueError as error:
1101                self.assertEqual(str(error), 'transaction should rollback')
1102            with con:
1103                cur.execute("insert into %s values (9)" % table)
1104            cur.execute("select * from %s order by 1" % table)
1105            rows = cur.fetchall()
1106            rows = [row[0] for row in rows]
1107        finally:
1108            con.close()
1109        self.assertEqual(rows, [1, 2, 5, 6, 9])
1110
1111    def test_cursor_connection(self):
1112        con = self._connect()
1113        cur = con.cursor()
1114        self.assertEqual(cur.connection, con)
1115        cur.close()
1116
1117    def test_cursor_as_contextmanager(self):
1118        con = self._connect()
1119        with con.cursor() as cur:
1120            self.assertEqual(cur.connection, con)
1121
1122    def test_pgdb_type(self):
1123        self.assertEqual(pgdb.STRING, pgdb.STRING)
1124        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1125        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1126        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1127        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1128        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1129        self.assertEqual('char', pgdb.STRING)
1130        self.assertEqual('varchar', pgdb.STRING)
1131        self.assertEqual('text', pgdb.STRING)
1132        self.assertNotEqual('numeric', pgdb.STRING)
1133        self.assertEqual('numeric', pgdb.NUMERIC)
1134        self.assertEqual('numeric', pgdb.NUMBER)
1135        self.assertEqual('int4', pgdb.NUMBER)
1136        self.assertNotEqual('int4', pgdb.NUMERIC)
1137        self.assertEqual('int2', pgdb.SMALLINT)
1138        self.assertNotEqual('int4', pgdb.SMALLINT)
1139        self.assertEqual('int2', pgdb.INTEGER)
1140        self.assertEqual('int4', pgdb.INTEGER)
1141        self.assertEqual('int8', pgdb.INTEGER)
1142        self.assertNotEqual('int4', pgdb.LONG)
1143        self.assertEqual('int8', pgdb.LONG)
1144        self.assertTrue('char' in pgdb.STRING)
1145        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1146        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1147        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1148        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1149        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1150        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1151        self.assertEqual('_char', pgdb.ARRAY)
1152        self.assertNotEqual('char', pgdb.ARRAY)
1153        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1154        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1155        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1156        self.assertEqual('record', pgdb.RECORD)
1157        self.assertNotEqual('_record', pgdb.RECORD)
1158
1159
1160if __name__ == '__main__':
1161    unittest.main()
Note: See TracBrowser for help on using the repository browser.