source: trunk/tests/test_dbapi20.py @ 841

Last change on this file since 841 was 841, checked in by cito, 4 years ago

Add type helper for UUIDs

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 45.6 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 841 2016-02-08 20:05:37Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import date, time, datetime, timedelta
32from uuid import UUID as Uuid
33
34try:
35    from datetime import timezone
36except ImportError:  # Python < 3.2
37    timezone = None
38
39try:
40    long
41except NameError:  # Python >= 3.0
42    long = int
43
44try:
45    from collections import OrderedDict
46except ImportError:  # Python 2.6 or 3.0
47    OrderedDict = None
48
49
50class PgBitString:
51    """Test object with a PostgreSQL representation as Bit String."""
52
53    def __init__(self, value):
54        self.value = value
55
56    def __pg_repr__(self):
57         return "B'{0:b}'".format(self.value)
58
59
60class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
61
62    driver = pgdb
63    connect_args = ()
64    connect_kw_args = {'database': dbname,
65        'host': '%s:%d' % (dbhost or '', dbport or -1)}
66
67    lower_func = 'lower'  # For stored procedure test
68
69    def setUp(self):
70        # Call superclass setUp in case this does something in the future
71        dbapi20.DatabaseAPI20Test.setUp(self)
72        try:
73            con = self._connect()
74            con.close()
75        except pgdb.Error:  # try to create a missing database
76            import pg
77            try:  # first try to log in as superuser
78                db = pg.DB('postgres', dbhost or None, dbport or -1,
79                    user='postgres')
80            except Exception:  # then try to log in as current user
81                db = pg.DB('postgres', dbhost or None, dbport or -1)
82            db.query('create database ' + dbname)
83
84    def tearDown(self):
85        dbapi20.DatabaseAPI20Test.tearDown(self)
86
87    def testVersion(self):
88        v = pgdb.version
89        self.assertIsInstance(v, str)
90        self.assertIn('.', v)
91        self.assertEqual(pgdb.__version__, v)
92
93    def test_callproc_no_params(self):
94        con = self._connect()
95        cur = con.cursor()
96        # note that now() does not change within a transaction
97        cur.execute('select now()')
98        now = cur.fetchone()[0]
99        res = cur.callproc('now')
100        self.assertIsNone(res)
101        res = cur.fetchone()[0]
102        self.assertEqual(res, now)
103
104    def test_callproc_bad_params(self):
105        con = self._connect()
106        cur = con.cursor()
107        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
108        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
109
110    def test_callproc_one_param(self):
111        con = self._connect()
112        cur = con.cursor()
113        params = (42.4382,)
114        res = cur.callproc("round", params)
115        self.assertIs(res, params)
116        res = cur.fetchone()[0]
117        self.assertEqual(res, 42)
118
119    def test_callproc_two_params(self):
120        con = self._connect()
121        cur = con.cursor()
122        params = (9, 4)
123        res = cur.callproc("div", params)
124        self.assertIs(res, params)
125        res = cur.fetchone()[0]
126        self.assertEqual(res, 2)
127
128    def test_cursor_type(self):
129
130        class TestCursor(pgdb.Cursor):
131            pass
132
133        con = self._connect()
134        self.assertIs(con.cursor_type, pgdb.Cursor)
135        cur = con.cursor()
136        self.assertIsInstance(cur, pgdb.Cursor)
137        self.assertNotIsInstance(cur, TestCursor)
138        con.cursor_type = TestCursor
139        cur = con.cursor()
140        self.assertIsInstance(cur, TestCursor)
141        cur = con.cursor()
142        self.assertIsInstance(cur, TestCursor)
143        con = self._connect()
144        self.assertIs(con.cursor_type, pgdb.Cursor)
145        cur = con.cursor()
146        self.assertIsInstance(cur, pgdb.Cursor)
147        self.assertNotIsInstance(cur, TestCursor)
148
149    def test_row_factory(self):
150
151        class TestCursor(pgdb.Cursor):
152
153            def row_factory(self, row):
154                return dict(('column %s' % desc[0], value)
155                    for desc, value in zip(self.description, row))
156
157        con = self._connect()
158        con.cursor_type = TestCursor
159        cur = con.cursor()
160        self.assertIsInstance(cur, TestCursor)
161        res = cur.execute("select 1 as a, 2 as b")
162        self.assertIs(res, cur, 'execute() should return cursor')
163        res = cur.fetchone()
164        self.assertIsInstance(res, dict)
165        self.assertEqual(res, {'column a': 1, 'column b': 2})
166        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
167        res = cur.fetchall()
168        self.assertIsInstance(res, list)
169        self.assertEqual(len(res), 2)
170        self.assertIsInstance(res[0], dict)
171        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
172        self.assertIsInstance(res[1], dict)
173        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
174
175    def test_build_row_factory(self):
176
177        class TestCursor(pgdb.Cursor):
178
179            def build_row_factory(self):
180                keys = [desc[0] for desc in self.description]
181                return lambda row: dict((key, value)
182                    for key, value in zip(keys, row))
183
184        con = self._connect()
185        con.cursor_type = TestCursor
186        cur = con.cursor()
187        self.assertIsInstance(cur, TestCursor)
188        cur.execute("select 1 as a, 2 as b")
189        res = cur.fetchone()
190        self.assertIsInstance(res, dict)
191        self.assertEqual(res, {'a': 1, 'b': 2})
192        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
193        res = cur.fetchall()
194        self.assertIsInstance(res, list)
195        self.assertEqual(len(res), 2)
196        self.assertIsInstance(res[0], dict)
197        self.assertEqual(res[0], {'a': 1, 'b': 2})
198        self.assertIsInstance(res[1], dict)
199        self.assertEqual(res[1], {'a': 3, 'b': 4})
200
201    def test_cursor_with_named_columns(self):
202        con = self._connect()
203        cur = con.cursor()
204        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
205        self.assertIs(res, cur, 'execute() should return cursor')
206        res = cur.fetchone()
207        self.assertIsInstance(res, tuple)
208        self.assertEqual(res, (1, 2, 3))
209        self.assertEqual(res._fields, ('abc', 'de', 'f'))
210        self.assertEqual(res.abc, 1)
211        self.assertEqual(res.de, 2)
212        self.assertEqual(res.f, 3)
213        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
214        res = cur.fetchall()
215        self.assertIsInstance(res, list)
216        self.assertEqual(len(res), 2)
217        self.assertIsInstance(res[0], tuple)
218        self.assertEqual(res[0], (1, 2))
219        self.assertEqual(res[0]._fields, ('one', 'two'))
220        self.assertIsInstance(res[1], tuple)
221        self.assertEqual(res[1], (3, 4))
222        self.assertEqual(res[1]._fields, ('one', 'two'))
223
224    def test_cursor_with_unnamed_columns(self):
225        con = self._connect()
226        cur = con.cursor()
227        cur.execute("select 1, 2, 3")
228        res = cur.fetchone()
229        self.assertIsInstance(res, tuple)
230        self.assertEqual(res, (1, 2, 3))
231        old_py = OrderedDict is None  # Python 2.6 or 3.0
232        # old Python versions cannot rename tuple fields with underscore
233        if old_py:
234            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
235        else:
236            self.assertEqual(res._fields, ('_0', '_1', '_2'))
237        cur.execute("select 1 as one, 2, 3 as three")
238        res = cur.fetchone()
239        self.assertIsInstance(res, tuple)
240        self.assertEqual(res, (1, 2, 3))
241        if old_py:  # cannot auto rename with underscore
242            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
243        else:
244            self.assertEqual(res._fields, ('one', '_1', 'three'))
245        cur.execute("select 1 as abc, 2 as def")
246        res = cur.fetchone()
247        self.assertIsInstance(res, tuple)
248        self.assertEqual(res, (1, 2))
249        if old_py:
250            self.assertEqual(res._fields, ('column_0', 'column_1'))
251        else:
252            self.assertEqual(res._fields, ('abc', '_1'))
253
254    def test_colnames(self):
255        con = self._connect()
256        cur = con.cursor()
257        cur.execute("select 1, 2, 3")
258        names = cur.colnames
259        self.assertIsInstance(names, list)
260        self.assertEqual(names, ['?column?', '?column?', '?column?'])
261        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
262        names = cur.colnames
263        self.assertIsInstance(names, list)
264        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
265
266    def test_coltypes(self):
267        con = self._connect()
268        cur = con.cursor()
269        cur.execute("select 1::int2, 2::int4, 3::int8")
270        types = cur.coltypes
271        self.assertIsInstance(types, list)
272        self.assertEqual(types, ['int2', 'int4', 'int8'])
273
274    def test_description_fields(self):
275        con = self._connect()
276        cur = con.cursor()
277        cur.execute("select 123456789::int8 col0,"
278            " 123456.789::numeric(41, 13) as col1,"
279            " 'foobar'::char(39) as col2")
280        desc = cur.description
281        self.assertIsInstance(desc, list)
282        self.assertEqual(len(desc), 3)
283        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
284        for i in range(3):
285            c, d = cols[i], desc[i]
286            self.assertIsInstance(d, tuple)
287            self.assertEqual(len(d), 7)
288            self.assertIsInstance(d.name, str)
289            self.assertEqual(d.name, 'col%d' % i)
290            self.assertIsInstance(d.type_code, str)
291            self.assertEqual(d.type_code, c[0])
292            self.assertIsNone(d.display_size)
293            self.assertIsInstance(d.internal_size, int)
294            self.assertEqual(d.internal_size, c[1])
295            if c[2] is not None:
296                self.assertIsInstance(d.precision, int)
297                self.assertEqual(d.precision, c[1])
298                self.assertIsInstance(d.scale, int)
299                self.assertEqual(d.scale, c[2])
300            else:
301                self.assertIsNone(d.precision)
302                self.assertIsNone(d.scale)
303            self.assertIsNone(d.null_ok)
304
305    def test_type_cache_info(self):
306        con = self._connect()
307        try:
308            cur = con.cursor()
309            type_cache = con.type_cache
310            self.assertNotIn('numeric', type_cache)
311            type_info = type_cache['numeric']
312            self.assertIn('numeric', type_cache)
313            self.assertEqual(type_info, 'numeric')
314            self.assertEqual(type_info.oid, 1700)
315            self.assertEqual(type_info.len, -1)
316            self.assertEqual(type_info.type, 'b')  # base
317            self.assertEqual(type_info.category, 'N')  # numeric
318            self.assertEqual(type_info.delim, ',')
319            self.assertEqual(type_info.relid, 0)
320            self.assertIs(con.type_cache[1700], type_info)
321            self.assertNotIn('pg_type', type_cache)
322            type_info = type_cache['pg_type']
323            self.assertIn('pg_type', type_cache)
324            self.assertEqual(type_info.type, 'c')  # composite
325            self.assertEqual(type_info.category, 'C')  # composite
326            cols = type_cache.get_fields('pg_type')
327            self.assertEqual(cols[0].name, 'typname')
328            typname = type_cache[cols[0].type]
329            self.assertEqual(typname, 'name')
330            self.assertEqual(typname.type, 'b')  # base
331            self.assertEqual(typname.category, 'S')  # string
332            self.assertEqual(cols[3].name, 'typlen')
333            typlen = type_cache[cols[3].type]
334            self.assertEqual(typlen, 'int2')
335            self.assertEqual(typlen.type, 'b')  # base
336            self.assertEqual(typlen.category, 'N')  # numeric
337            cur.close()
338            cur = con.cursor()
339            type_cache = con.type_cache
340            self.assertIn('numeric', type_cache)
341            cur.close()
342        finally:
343            con.close()
344        con = self._connect()
345        try:
346            cur = con.cursor()
347            type_cache = con.type_cache
348            self.assertNotIn('pg_type', type_cache)
349            self.assertEqual(type_cache.get('pg_type'), type_info)
350            self.assertIn('pg_type', type_cache)
351            self.assertIsNone(type_cache.get(
352                self.table_prefix + '_surely_does_not_exist'))
353            cur.close()
354        finally:
355            con.close()
356
357    def test_type_cache_typecast(self):
358        con = self._connect()
359        try:
360            cur = con.cursor()
361            type_cache = con.type_cache
362            self.assertIs(type_cache.get_typecast('int4'), int)
363            cast_int = lambda v: 'int(%s)' % v
364            type_cache.set_typecast('int4', cast_int)
365            query = 'select 2::int2, 4::int4, 8::int8'
366            cur.execute(query)
367            i2, i4, i8 = cur.fetchone()
368            self.assertEqual(i2, 2)
369            self.assertEqual(i4, 'int(4)')
370            self.assertEqual(i8, 8)
371            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
372            type_cache.set_typecast(['int2', 'int8'], cast_int)
373            cur.execute(query)
374            i2, i4, i8 = cur.fetchone()
375            self.assertEqual(i2, 'int(2)')
376            self.assertEqual(i4, 'int(4)')
377            self.assertEqual(i8, 'int(8)')
378            type_cache.reset_typecast('int4')
379            cur.execute(query)
380            i2, i4, i8 = cur.fetchone()
381            self.assertEqual(i2, 'int(2)')
382            self.assertEqual(i4, 4)
383            self.assertEqual(i8, 'int(8)')
384            type_cache.reset_typecast(['int2', 'int8'])
385            cur.execute(query)
386            i2, i4, i8 = cur.fetchone()
387            self.assertEqual(i2, 2)
388            self.assertEqual(i4, 4)
389            self.assertEqual(i8, 8)
390            type_cache.set_typecast(['int2', 'int8'], cast_int)
391            cur.execute(query)
392            i2, i4, i8 = cur.fetchone()
393            self.assertEqual(i2, 'int(2)')
394            self.assertEqual(i4, 4)
395            self.assertEqual(i8, 'int(8)')
396            type_cache.reset_typecast()
397            cur.execute(query)
398            i2, i4, i8 = cur.fetchone()
399            self.assertEqual(i2, 2)
400            self.assertEqual(i4, 4)
401            self.assertEqual(i8, 8)
402            cur.close()
403        finally:
404            con.close()
405
406    def test_cursor_iteration(self):
407        con = self._connect()
408        cur = con.cursor()
409        cur.execute("select 1 union select 2 union select 3")
410        self.assertEqual([r[0] for r in cur], [1, 2, 3])
411
412    def test_fetch_2_rows(self):
413        Decimal = pgdb.decimal_type()
414        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
415            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
416            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
417            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
418            pgdb.Interval(15, 31, 5), 7897234)
419        table = self.table_prefix + 'booze'
420        con = self._connect()
421        try:
422            cur = con.cursor()
423            cur.execute("set datestyle to iso")
424            cur.execute("create table %s ("
425                "stringtest varchar,"
426                "binarytest bytea,"
427                "booltest bool,"
428                "integertest int4,"
429                "longtest int8,"
430                "floattest float8,"
431                "numerictest numeric,"
432                "moneytest money,"
433                "datetest date,"
434                "timetest time,"
435                "datetimetest timestamp,"
436                "intervaltest interval,"
437                "rowidtest oid)" % table)
438            cur.execute("set standard_conforming_strings to on")
439            for s in ('numeric', 'monetary', 'time'):
440                cur.execute("set lc_%s to 'C'" % s)
441            for _i in range(2):
442                cur.execute("insert into %s values ("
443                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
444                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
445            cur.execute("select * from %s" % table)
446            rows = cur.fetchall()
447            self.assertEqual(len(rows), 2)
448            row0 = rows[0]
449            self.assertEqual(row0, values)
450            self.assertEqual(row0, rows[1])
451            self.assertIsInstance(row0[0], str)
452            self.assertIsInstance(row0[1], bytes)
453            self.assertIsInstance(row0[2], bool)
454            self.assertIsInstance(row0[3], int)
455            self.assertIsInstance(row0[4], long)
456            self.assertIsInstance(row0[5], float)
457            self.assertIsInstance(row0[6], Decimal)
458            self.assertIsInstance(row0[7], Decimal)
459            self.assertIsInstance(row0[8], date)
460            self.assertIsInstance(row0[9], time)
461            self.assertIsInstance(row0[10], datetime)
462            self.assertIsInstance(row0[11], timedelta)
463        finally:
464            con.close()
465
466    def test_integrity_error(self):
467        table = self.table_prefix + 'booze'
468        con = self._connect()
469        try:
470            cur = con.cursor()
471            cur.execute("set client_min_messages = warning")
472            cur.execute("create table %s (i int primary key)" % table)
473            cur.execute("insert into %s values (1)" % table)
474            cur.execute("insert into %s values (2)" % table)
475            self.assertRaises(pgdb.IntegrityError, cur.execute,
476                "insert into %s values (1)" % table)
477        finally:
478            con.close()
479
480    def test_sqlstate(self):
481        con = self._connect()
482        cur = con.cursor()
483        try:
484            cur.execute("select 1/0")
485        except pgdb.DatabaseError as error:
486            self.assertTrue(isinstance(error, pgdb.DataError))
487            # the SQLSTATE error code for division by zero is 22012
488            self.assertEqual(error.sqlstate, '22012')
489
490    def test_float(self):
491        nan, inf = float('nan'), float('inf')
492        from math import isnan, isinf
493        self.assertTrue(isnan(nan) and not isinf(nan))
494        self.assertTrue(isinf(inf) and not isnan(inf))
495        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
496            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
497        table = self.table_prefix + 'booze'
498        con = self._connect()
499        try:
500            cur = con.cursor()
501            cur.execute(
502                "create table %s (n smallint, floattest float)" % table)
503            params = enumerate(values)
504            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
505            cur.execute("select floattest from %s order by n" % table)
506            rows = cur.fetchall()
507            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
508            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
509            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
510        finally:
511            con.close()
512        self.assertEqual(len(rows), len(values))
513        rows = [row[0] for row in rows]
514        for inval, outval in zip(values, rows):
515            if inval in ('inf', 'Infinity'):
516                inval = inf
517            elif inval in ('-inf', '-Infinity'):
518                inval = -inf
519            elif inval in ('nan', 'NaN'):
520                inval = nan
521            if isinf(inval):
522                self.assertTrue(isinf(outval))
523                if inval < 0:
524                    self.assertTrue(outval < 0)
525                else:
526                    self.assertTrue(outval > 0)
527            elif isnan(inval):
528                self.assertTrue(isnan(outval))
529            else:
530                self.assertEqual(inval, outval)
531
532    def test_datetime(self):
533        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
534        table = self.table_prefix + 'booze'
535        con = self._connect()
536        try:
537            cur = con.cursor()
538            cur.execute("create table %s ("
539                "d date, t time,  ts timestamp,"
540                "tz timetz, tsz timestamptz)" % table)
541            for n in range(3):
542                values = [dt.date(), dt.time(), dt,
543                    dt.time(), dt]
544                if timezone:
545                    values[3] = values[3].replace(tzinfo=timezone.utc)
546                    values[4] = values[4].replace(tzinfo=timezone.utc)
547                if n == 0:  # input as objects
548                    params = values
549                if n == 1:  # input as text
550                    params = [v.isoformat() for v in values]  # as text
551                elif n == 2:  # input using type helpers
552                    d = (dt.year, dt.month, dt.day)
553                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
554                    params = [pgdb.Date(*d), pgdb.Time(*t),
555                            pgdb.Timestamp(*(d + t)), pgdb.Time(*t),
556                            pgdb.Timestamp(*(d + t))]
557                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
558                        'sql, mdy', 'sql, dmy', 'german'):
559                    cur.execute("set datestyle to %s" % datestyle)
560                    if n != 1:
561                        cur.execute("select %s,%s,%s,%s,%s", params)
562                        row = cur.fetchone()
563                        self.assertEqual(row, tuple(values))
564                    cur.execute("insert into %s"
565                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
566                    cur.execute("select * from %s" % table)
567                    d = cur.description
568                    for i in range(5):
569                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
570                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
571                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
572                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
573                    self.assertEqual(d[0].type_code, pgdb.DATE)
574                    self.assertEqual(d[1].type_code, pgdb.TIME)
575                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
576                    self.assertEqual(d[3].type_code, pgdb.TIME)
577                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
578                    row = cur.fetchone()
579                    self.assertEqual(row, tuple(values))
580                    cur.execute("delete from %s" % table)
581        finally:
582            con.close()
583
584    def test_interval(self):
585        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
586        table = self.table_prefix + 'booze'
587        con = self._connect()
588        try:
589            cur = con.cursor()
590            cur.execute("create table %s (i interval)" % table)
591            for n in range(3):
592                if n == 0:  # input as objects
593                    param = td
594                if n == 1:  # input as text
595                    param = '%d days %d seconds %d microseconds ' % (
596                        td.days, td.seconds, td.microseconds)
597                elif n == 2:  # input using type helpers
598                    param = pgdb.Interval(
599                        td.days, 0, 0, td.seconds, td.microseconds)
600                for intervalstyle in ('sql_standard ', 'postgres',
601                        'postgres_verbose', 'iso_8601'):
602                    cur.execute("set intervalstyle to %s" % intervalstyle)
603                    cur.execute("insert into %s"
604                        " values (%%s)" % table, [param])
605                    cur.execute("select * from %s" % table)
606                    tc = cur.description[0].type_code
607                    self.assertEqual(tc, pgdb.DATETIME)
608                    self.assertNotEqual(tc, pgdb.STRING)
609                    self.assertNotEqual(tc, pgdb.ARRAY)
610                    self.assertNotEqual(tc, pgdb.RECORD)
611                    self.assertEqual(tc, pgdb.INTERVAL)
612                    row = cur.fetchone()
613                    self.assertEqual(row, (td,))
614                    cur.execute("delete from %s" % table)
615        finally:
616            con.close()
617
618    def test_hstore(self):
619        con = self._connect()
620        try:
621            cur = con.cursor()
622            cur.execute("select 'k=>v'::hstore")
623        except pgdb.DatabaseError:
624            try:
625                cur.execute("create extension hstore")
626            except pgdb.DatabaseError:
627                self.skipTest("hstore extension not enabled")
628        finally:
629            con.close()
630        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever', 'back\\': '\\slash',
631            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
632            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
633            'None': None, 'NULL': 'NULL', 'empty': ''}
634        con = self._connect()
635        try:
636            cur = con.cursor()
637            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
638            result = cur.fetchone()[0]
639        finally:
640            con.close()
641        self.assertIsInstance(result, dict)
642        self.assertEqual(result, d)
643
644    def test_uuid(self):
645        self.assertIs(Uuid, pgdb.Uuid)
646        d = Uuid('{12345678-1234-5678-1234-567812345678}')
647        con = self._connect()
648        try:
649            cur = con.cursor()
650            cur.execute("select %s::uuid", (d,))
651            result = cur.fetchone()[0]
652        finally:
653            con.close()
654        self.assertIsInstance(result, Uuid)
655        self.assertEqual(result, d)
656
657    def test_insert_array(self):
658        values = [(None, None), ([], []), ([None], [[None], ['null']]),
659            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
660            ([20000, 25000, 25000, 30000],
661            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
662            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
663        table = self.table_prefix + 'booze'
664        con = self._connect()
665        try:
666            cur = con.cursor()
667            cur.execute("create table %s"
668                " (n smallint, i int[], t text[][])" % table)
669            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
670            # Note that we must explicit casts because we are inserting
671            # empty arrays.  Otherwise this is not necessary.
672            cur.executemany("insert into %s values"
673                " (%%d,%%s::int[],%%s::text[][])" % table, params)
674            cur.execute("select i, t from %s order by n" % table)
675            d = cur.description
676            self.assertEqual(d[0].type_code, pgdb.ARRAY)
677            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
678            self.assertEqual(d[0].type_code, pgdb.NUMBER)
679            self.assertEqual(d[0].type_code, pgdb.INTEGER)
680            self.assertEqual(d[1].type_code, pgdb.ARRAY)
681            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
682            self.assertEqual(d[1].type_code, pgdb.STRING)
683            rows = cur.fetchall()
684        finally:
685            con.close()
686        self.assertEqual(rows, values)
687
688    def test_select_array(self):
689        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
690        con = self._connect()
691        try:
692            cur = con.cursor()
693            cur.execute("select %s::int[], %s::text[]", values)
694            row = cur.fetchone()
695        finally:
696            con.close()
697        self.assertEqual(row, values)
698
699    def test_insert_record(self):
700        values = [('John', 61), ('Jane', 63),
701                  ('Fred', None), ('Wilma', None),
702                  (None, 42), (None, None)]
703        table = self.table_prefix + 'booze'
704        record = self.table_prefix + 'munch'
705        con = self._connect()
706        try:
707            cur = con.cursor()
708            cur.execute("create type %s as (name varchar, age int)" % record)
709            cur.execute("create table %s (n smallint, r %s)" % (table, record))
710            params = enumerate(values)
711            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
712            cur.execute("select r from %s order by n" % table)
713            type_code = cur.description[0].type_code
714            self.assertEqual(type_code, record)
715            self.assertEqual(type_code, pgdb.RECORD)
716            self.assertNotEqual(type_code, pgdb.ARRAY)
717            columns = con.type_cache.get_fields(type_code)
718            self.assertEqual(columns[0].name, 'name')
719            self.assertEqual(columns[1].name, 'age')
720            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
721            self.assertEqual(con.type_cache[columns[1].type], 'int4')
722            rows = cur.fetchall()
723        finally:
724            cur.execute('drop table %s' % table)
725            cur.execute('drop type %s' % record)
726            con.close()
727        self.assertEqual(len(rows), len(values))
728        rows = [row[0] for row in rows]
729        self.assertEqual(rows, values)
730        self.assertEqual(rows[0].name, 'John')
731        self.assertEqual(rows[0].age, 61)
732
733    def test_select_record(self):
734        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
735            '(test)', '(x,y)', ' x y ', 'null', None)
736        con = self._connect()
737        try:
738            cur = con.cursor()
739            cur.execute("select %s as test_record", [value])
740            self.assertEqual(cur.description[0].name, 'test_record')
741            self.assertEqual(cur.description[0].type_code, 'record')
742            row = cur.fetchone()[0]
743        finally:
744            con.close()
745        # Note that the element types get lost since we created an
746        # untyped record (an anonymous composite type). For the same
747        # reason this is also a normal tuple, not a named tuple.
748        text_row = tuple(None if v is None else str(v) for v in value)
749        self.assertEqual(row, text_row)
750
751    def test_custom_type(self):
752        values = [3, 5, 65]
753        values = list(map(PgBitString, values))
754        table = self.table_prefix + 'booze'
755        con = self._connect()
756        try:
757            cur = con.cursor()
758            params = enumerate(values)  # params have __pg_repr__ method
759            cur.execute(
760                'create table "%s" (n smallint, b bit varying(7))' % table)
761            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
762            cur.execute("select * from %s" % table)
763            rows = cur.fetchall()
764        finally:
765            con.close()
766        self.assertEqual(len(rows), len(values))
767        con = self._connect()
768        try:
769            cur = con.cursor()
770            params = (1, object())  # an object that cannot be handled
771            self.assertRaises(pgdb.InterfaceError, cur.execute,
772                "insert into %s values (%%s,%%s)" % table, params)
773        finally:
774            con.close()
775
776    def test_set_decimal_type(self):
777        decimal_type = pgdb.decimal_type()
778        self.assertTrue(decimal_type is not None and callable(decimal_type))
779        con = self._connect()
780        try:
781            cur = con.cursor()
782            # change decimal type globally to int
783            int_type = lambda v: int(float(v))
784            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
785            cur.execute('select 4.25')
786            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
787            value = cur.fetchone()[0]
788            self.assertTrue(isinstance(value, int))
789            self.assertEqual(value, 4)
790            # change decimal type again to float
791            self.assertTrue(pgdb.decimal_type(float) is float)
792            cur.execute('select 4.25')
793            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
794            value = cur.fetchone()[0]
795            # the connection still uses the old setting
796            self.assertTrue(isinstance(value, int))
797            # bust the cache for type functions for the connection
798            con.type_cache.reset_typecast()
799            cur.execute('select 4.25')
800            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
801            value = cur.fetchone()[0]
802            # now the connection uses the new setting
803            self.assertTrue(isinstance(value, float))
804            self.assertEqual(value, 4.25)
805        finally:
806            con.close()
807            pgdb.decimal_type(decimal_type)
808        self.assertTrue(pgdb.decimal_type() is decimal_type)
809
810    def test_global_typecast(self):
811        try:
812            query = 'select 2::int2, 4::int4, 8::int8'
813            self.assertIs(pgdb.get_typecast('int4'), int)
814            cast_int = lambda v: 'int(%s)' % v
815            pgdb.set_typecast('int4', cast_int)
816            con = self._connect()
817            try:
818                i2, i4, i8 = con.cursor().execute(query).fetchone()
819            finally:
820                con.close()
821            self.assertEqual(i2, 2)
822            self.assertEqual(i4, 'int(4)')
823            self.assertEqual(i8, 8)
824            pgdb.set_typecast(['int2', 'int8'], cast_int)
825            con = self._connect()
826            try:
827                i2, i4, i8 = con.cursor().execute(query).fetchone()
828            finally:
829                con.close()
830            self.assertEqual(i2, 'int(2)')
831            self.assertEqual(i4, 'int(4)')
832            self.assertEqual(i8, 'int(8)')
833            pgdb.reset_typecast('int4')
834            con = self._connect()
835            try:
836                i2, i4, i8 = con.cursor().execute(query).fetchone()
837            finally:
838                con.close()
839            self.assertEqual(i2, 'int(2)')
840            self.assertEqual(i4, 4)
841            self.assertEqual(i8, 'int(8)')
842            pgdb.reset_typecast(['int2', 'int8'])
843            con = self._connect()
844            try:
845                i2, i4, i8 = con.cursor().execute(query).fetchone()
846            finally:
847                con.close()
848            self.assertEqual(i2, 2)
849            self.assertEqual(i4, 4)
850            self.assertEqual(i8, 8)
851            pgdb.set_typecast(['int2', 'int8'], cast_int)
852            con = self._connect()
853            try:
854                i2, i4, i8 = con.cursor().execute(query).fetchone()
855            finally:
856                con.close()
857            self.assertEqual(i2, 'int(2)')
858            self.assertEqual(i4, 4)
859            self.assertEqual(i8, 'int(8)')
860        finally:
861            pgdb.reset_typecast()
862        con = self._connect()
863        try:
864            i2, i4, i8 = con.cursor().execute(query).fetchone()
865        finally:
866            con.close()
867        self.assertEqual(i2, 2)
868        self.assertEqual(i4, 4)
869        self.assertEqual(i8, 8)
870
871    def test_unicode_with_utf8(self):
872        table = self.table_prefix + 'booze'
873        input = u"He wes Leovenaðes sone — liðe him be Drihten"
874        con = self._connect()
875        try:
876            cur = con.cursor()
877            cur.execute("create table %s (t text)" % table)
878            try:
879                cur.execute("set client_encoding=utf8")
880                cur.execute(u"select '%s'" % input)
881            except Exception:
882                self.skipTest("database does not support utf8")
883            output1 = cur.fetchone()[0]
884            cur.execute("insert into %s values (%%s)" % table, (input,))
885            cur.execute("select * from %s" % table)
886            output2 = cur.fetchone()[0]
887            cur.execute("select t = '%s' from %s" % (input, table))
888            output3 = cur.fetchone()[0]
889            cur.execute("select t = %%s from %s" % table, (input,))
890            output4 = cur.fetchone()[0]
891        finally:
892            con.close()
893        if str is bytes:  # Python < 3.0
894            input = input.encode('utf8')
895        self.assertIsInstance(output1, str)
896        self.assertEqual(output1, input)
897        self.assertIsInstance(output2, str)
898        self.assertEqual(output2, input)
899        self.assertIsInstance(output3, bool)
900        self.assertTrue(output3)
901        self.assertIsInstance(output4, bool)
902        self.assertTrue(output4)
903
904    def test_unicode_with_latin1(self):
905        table = self.table_prefix + 'booze'
906        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
907        con = self._connect()
908        try:
909            cur = con.cursor()
910            cur.execute("create table %s (t text)" % table)
911            try:
912                cur.execute("set client_encoding=latin1")
913                cur.execute(u"select '%s'" % input)
914            except Exception:
915                self.skipTest("database does not support latin1")
916            output1 = cur.fetchone()[0]
917            cur.execute("insert into %s values (%%s)" % table, (input,))
918            cur.execute("select * from %s" % table)
919            output2 = cur.fetchone()[0]
920            cur.execute("select t = '%s' from %s" % (input, table))
921            output3 = cur.fetchone()[0]
922            cur.execute("select t = %%s from %s" % table, (input,))
923            output4 = cur.fetchone()[0]
924        finally:
925            con.close()
926        if str is bytes:  # Python < 3.0
927            input = input.encode('latin1')
928        self.assertIsInstance(output1, str)
929        self.assertEqual(output1, input)
930        self.assertIsInstance(output2, str)
931        self.assertEqual(output2, input)
932        self.assertIsInstance(output3, bool)
933        self.assertTrue(output3)
934        self.assertIsInstance(output4, bool)
935        self.assertTrue(output4)
936
937    def test_bool(self):
938        values = [False, True, None, 't', 'f', 'true', 'false']
939        table = self.table_prefix + 'booze'
940        con = self._connect()
941        try:
942            cur = con.cursor()
943            cur.execute(
944                "create table %s (n smallint, booltest bool)" % table)
945            params = enumerate(values)
946            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
947            cur.execute("select booltest from %s order by n" % table)
948            rows = cur.fetchall()
949            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
950        finally:
951            con.close()
952        rows = [row[0] for row in rows]
953        values[3] = values[5] = True
954        values[4] = values[6] = False
955        self.assertEqual(rows, values)
956
957    def test_literal(self):
958        con = self._connect()
959        try:
960            cur = con.cursor()
961            value = "lower('Hello')"
962            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
963            row = cur.fetchone()
964        finally:
965            con.close()
966        self.assertEqual(row, (value, 'hello'))
967
968
969    def test_json(self):
970        inval = {"employees":
971            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
972        table = self.table_prefix + 'booze'
973        con = self._connect()
974        try:
975            cur = con.cursor()
976            try:
977                cur.execute("create table %s (jsontest json)" % table)
978            except pgdb.ProgrammingError:
979                self.skipTest('database does not support json')
980            params = (pgdb.Json(inval),)
981            cur.execute("insert into %s values (%%s)" % table, params)
982            cur.execute("select jsontest from %s" % table)
983            outval = cur.fetchone()[0]
984            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
985        finally:
986            con.close()
987        self.assertEqual(inval, outval)
988
989    def test_jsonb(self):
990        inval = {"employees":
991            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
992        table = self.table_prefix + 'booze'
993        con = self._connect()
994        try:
995            cur = con.cursor()
996            try:
997                cur.execute("create table %s (jsonbtest jsonb)" % table)
998            except pgdb.ProgrammingError:
999                self.skipTest('database does not support jsonb')
1000            params = (pgdb.Json(inval),)
1001            cur.execute("insert into %s values (%%s)" % table, params)
1002            cur.execute("select jsonbtest from %s" % table)
1003            outval = cur.fetchone()[0]
1004            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1005        finally:
1006            con.close()
1007        self.assertEqual(inval, outval)
1008
1009    def test_execute_edge_cases(self):
1010        con = self._connect()
1011        try:
1012            cur = con.cursor()
1013            sql = 'invalid'  # should be ignored with empty parameter list
1014            cur.executemany(sql, [])
1015            sql = 'select %d + 1'
1016            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
1017            self.assertEqual(cur.fetchone()[0], 3)
1018            sql = 'select 1/0'  # cannot be executed
1019            self.assertRaises(pgdb.DataError, cur.execute, sql)
1020            cur.close()
1021            con.rollback()
1022            if pgdb.shortcutmethods:
1023                res = con.execute('select %d', (1,)).fetchone()
1024                self.assertEqual(res, (1,))
1025                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1026                self.assertEqual(res, (2,))
1027        finally:
1028            con.close()
1029        sql = 'select 1'  # cannot be executed after connection is closed
1030        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1031
1032    def test_fetchmany_with_keep(self):
1033        con = self._connect()
1034        try:
1035            cur = con.cursor()
1036            self.assertEqual(cur.arraysize, 1)
1037            cur.execute('select * from generate_series(1, 25)')
1038            self.assertEqual(len(cur.fetchmany()), 1)
1039            self.assertEqual(len(cur.fetchmany()), 1)
1040            self.assertEqual(cur.arraysize, 1)
1041            cur.arraysize = 3
1042            self.assertEqual(len(cur.fetchmany()), 3)
1043            self.assertEqual(len(cur.fetchmany()), 3)
1044            self.assertEqual(cur.arraysize, 3)
1045            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1046            self.assertEqual(cur.arraysize, 3)
1047            self.assertEqual(len(cur.fetchmany()), 3)
1048            self.assertEqual(len(cur.fetchmany()), 3)
1049            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1050            self.assertEqual(cur.arraysize, 2)
1051            self.assertEqual(len(cur.fetchmany()), 2)
1052            self.assertEqual(len(cur.fetchmany()), 2)
1053            self.assertEqual(len(cur.fetchmany(25)), 3)
1054        finally:
1055            con.close()
1056
1057    def test_nextset(self):
1058        con = self._connect()
1059        cur = con.cursor()
1060        self.assertRaises(con.NotSupportedError, cur.nextset)
1061
1062    def test_setoutputsize(self):
1063        pass  # not supported
1064
1065    def test_connection_errors(self):
1066        con = self._connect()
1067        self.assertEqual(con.Error, pgdb.Error)
1068        self.assertEqual(con.Warning, pgdb.Warning)
1069        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1070        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1071        self.assertEqual(con.InternalError, pgdb.InternalError)
1072        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1073        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1074        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1075        self.assertEqual(con.DataError, pgdb.DataError)
1076        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1077
1078    def test_connection_as_contextmanager(self):
1079        table = self.table_prefix + 'booze'
1080        con = self._connect()
1081        try:
1082            cur = con.cursor()
1083            cur.execute("create table %s (n smallint check(n!=4))" % table)
1084            with con:
1085                cur.execute("insert into %s values (1)" % table)
1086                cur.execute("insert into %s values (2)" % table)
1087            try:
1088                with con:
1089                    cur.execute("insert into %s values (3)" % table)
1090                    cur.execute("insert into %s values (4)" % table)
1091            except con.IntegrityError as error:
1092                self.assertTrue('check' in str(error).lower())
1093            with con:
1094                cur.execute("insert into %s values (5)" % table)
1095                cur.execute("insert into %s values (6)" % table)
1096            try:
1097                with con:
1098                    cur.execute("insert into %s values (7)" % table)
1099                    cur.execute("insert into %s values (8)" % table)
1100                    raise ValueError('transaction should rollback')
1101            except ValueError as error:
1102                self.assertEqual(str(error), 'transaction should rollback')
1103            with con:
1104                cur.execute("insert into %s values (9)" % table)
1105            cur.execute("select * from %s order by 1" % table)
1106            rows = cur.fetchall()
1107            rows = [row[0] for row in rows]
1108        finally:
1109            con.close()
1110        self.assertEqual(rows, [1, 2, 5, 6, 9])
1111
1112    def test_cursor_connection(self):
1113        con = self._connect()
1114        cur = con.cursor()
1115        self.assertEqual(cur.connection, con)
1116        cur.close()
1117
1118    def test_cursor_as_contextmanager(self):
1119        con = self._connect()
1120        with con.cursor() as cur:
1121            self.assertEqual(cur.connection, con)
1122
1123    def test_pgdb_type(self):
1124        self.assertEqual(pgdb.STRING, pgdb.STRING)
1125        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1126        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1127        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1128        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1129        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1130        self.assertEqual('char', pgdb.STRING)
1131        self.assertEqual('varchar', pgdb.STRING)
1132        self.assertEqual('text', pgdb.STRING)
1133        self.assertNotEqual('numeric', pgdb.STRING)
1134        self.assertEqual('numeric', pgdb.NUMERIC)
1135        self.assertEqual('numeric', pgdb.NUMBER)
1136        self.assertEqual('int4', pgdb.NUMBER)
1137        self.assertNotEqual('int4', pgdb.NUMERIC)
1138        self.assertEqual('int2', pgdb.SMALLINT)
1139        self.assertNotEqual('int4', pgdb.SMALLINT)
1140        self.assertEqual('int2', pgdb.INTEGER)
1141        self.assertEqual('int4', pgdb.INTEGER)
1142        self.assertEqual('int8', pgdb.INTEGER)
1143        self.assertNotEqual('int4', pgdb.LONG)
1144        self.assertEqual('int8', pgdb.LONG)
1145        self.assertTrue('char' in pgdb.STRING)
1146        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1147        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1148        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1149        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1150        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1151        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1152        self.assertEqual('_char', pgdb.ARRAY)
1153        self.assertNotEqual('char', pgdb.ARRAY)
1154        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1155        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1156        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1157        self.assertEqual('record', pgdb.RECORD)
1158        self.assertNotEqual('_record', pgdb.RECORD)
1159
1160
1161if __name__ == '__main__':
1162    unittest.main()
Note: See TracBrowser for help on using the repository browser.