source: trunk/tests/test_dbapi20.py @ 842

Last change on this file since 842 was 842, checked in by cito, 4 years ago

Treat percent signs in SQL strings always the same

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 45.9 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 842 2016-02-08 21:02:10Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import date, time, datetime, timedelta
32from uuid import UUID as Uuid
33
34try:
35    from datetime import timezone
36except ImportError:  # Python < 3.2
37    timezone = None
38
39try:
40    long
41except NameError:  # Python >= 3.0
42    long = int
43
44try:
45    from collections import OrderedDict
46except ImportError:  # Python 2.6 or 3.0
47    OrderedDict = None
48
49
50class PgBitString:
51    """Test object with a PostgreSQL representation as Bit String."""
52
53    def __init__(self, value):
54        self.value = value
55
56    def __pg_repr__(self):
57         return "B'{0:b}'".format(self.value)
58
59
60class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
61
62    driver = pgdb
63    connect_args = ()
64    connect_kw_args = {'database': dbname,
65        'host': '%s:%d' % (dbhost or '', dbport or -1)}
66
67    lower_func = 'lower'  # For stored procedure test
68
69    def setUp(self):
70        # Call superclass setUp in case this does something in the future
71        dbapi20.DatabaseAPI20Test.setUp(self)
72        try:
73            con = self._connect()
74            con.close()
75        except pgdb.Error:  # try to create a missing database
76            import pg
77            try:  # first try to log in as superuser
78                db = pg.DB('postgres', dbhost or None, dbport or -1,
79                    user='postgres')
80            except Exception:  # then try to log in as current user
81                db = pg.DB('postgres', dbhost or None, dbport or -1)
82            db.query('create database ' + dbname)
83
84    def tearDown(self):
85        dbapi20.DatabaseAPI20Test.tearDown(self)
86
87    def testVersion(self):
88        v = pgdb.version
89        self.assertIsInstance(v, str)
90        self.assertIn('.', v)
91        self.assertEqual(pgdb.__version__, v)
92
93    def test_percent_sign(self):
94        con = self._connect()
95        cur = con.cursor()
96        cur.execute("select %s, 'a %% sign'", ('a % sign',))
97        self.assertEqual(cur.fetchone(), ('a % sign', 'a % sign'))
98        cur.execute("select 'a %% sign'")
99        self.assertEqual(cur.fetchone(), ('a % sign',))
100
101    def test_callproc_no_params(self):
102        con = self._connect()
103        cur = con.cursor()
104        # note that now() does not change within a transaction
105        cur.execute('select now()')
106        now = cur.fetchone()[0]
107        res = cur.callproc('now')
108        self.assertIsNone(res)
109        res = cur.fetchone()[0]
110        self.assertEqual(res, now)
111
112    def test_callproc_bad_params(self):
113        con = self._connect()
114        cur = con.cursor()
115        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
116        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
117
118    def test_callproc_one_param(self):
119        con = self._connect()
120        cur = con.cursor()
121        params = (42.4382,)
122        res = cur.callproc("round", params)
123        self.assertIs(res, params)
124        res = cur.fetchone()[0]
125        self.assertEqual(res, 42)
126
127    def test_callproc_two_params(self):
128        con = self._connect()
129        cur = con.cursor()
130        params = (9, 4)
131        res = cur.callproc("div", params)
132        self.assertIs(res, params)
133        res = cur.fetchone()[0]
134        self.assertEqual(res, 2)
135
136    def test_cursor_type(self):
137
138        class TestCursor(pgdb.Cursor):
139            pass
140
141        con = self._connect()
142        self.assertIs(con.cursor_type, pgdb.Cursor)
143        cur = con.cursor()
144        self.assertIsInstance(cur, pgdb.Cursor)
145        self.assertNotIsInstance(cur, TestCursor)
146        con.cursor_type = TestCursor
147        cur = con.cursor()
148        self.assertIsInstance(cur, TestCursor)
149        cur = con.cursor()
150        self.assertIsInstance(cur, TestCursor)
151        con = self._connect()
152        self.assertIs(con.cursor_type, pgdb.Cursor)
153        cur = con.cursor()
154        self.assertIsInstance(cur, pgdb.Cursor)
155        self.assertNotIsInstance(cur, TestCursor)
156
157    def test_row_factory(self):
158
159        class TestCursor(pgdb.Cursor):
160
161            def row_factory(self, row):
162                return dict(('column %s' % desc[0], value)
163                    for desc, value in zip(self.description, row))
164
165        con = self._connect()
166        con.cursor_type = TestCursor
167        cur = con.cursor()
168        self.assertIsInstance(cur, TestCursor)
169        res = cur.execute("select 1 as a, 2 as b")
170        self.assertIs(res, cur, 'execute() should return cursor')
171        res = cur.fetchone()
172        self.assertIsInstance(res, dict)
173        self.assertEqual(res, {'column a': 1, 'column b': 2})
174        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
175        res = cur.fetchall()
176        self.assertIsInstance(res, list)
177        self.assertEqual(len(res), 2)
178        self.assertIsInstance(res[0], dict)
179        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
180        self.assertIsInstance(res[1], dict)
181        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
182
183    def test_build_row_factory(self):
184
185        class TestCursor(pgdb.Cursor):
186
187            def build_row_factory(self):
188                keys = [desc[0] for desc in self.description]
189                return lambda row: dict((key, value)
190                    for key, value in zip(keys, row))
191
192        con = self._connect()
193        con.cursor_type = TestCursor
194        cur = con.cursor()
195        self.assertIsInstance(cur, TestCursor)
196        cur.execute("select 1 as a, 2 as b")
197        res = cur.fetchone()
198        self.assertIsInstance(res, dict)
199        self.assertEqual(res, {'a': 1, 'b': 2})
200        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
201        res = cur.fetchall()
202        self.assertIsInstance(res, list)
203        self.assertEqual(len(res), 2)
204        self.assertIsInstance(res[0], dict)
205        self.assertEqual(res[0], {'a': 1, 'b': 2})
206        self.assertIsInstance(res[1], dict)
207        self.assertEqual(res[1], {'a': 3, 'b': 4})
208
209    def test_cursor_with_named_columns(self):
210        con = self._connect()
211        cur = con.cursor()
212        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
213        self.assertIs(res, cur, 'execute() should return cursor')
214        res = cur.fetchone()
215        self.assertIsInstance(res, tuple)
216        self.assertEqual(res, (1, 2, 3))
217        self.assertEqual(res._fields, ('abc', 'de', 'f'))
218        self.assertEqual(res.abc, 1)
219        self.assertEqual(res.de, 2)
220        self.assertEqual(res.f, 3)
221        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
222        res = cur.fetchall()
223        self.assertIsInstance(res, list)
224        self.assertEqual(len(res), 2)
225        self.assertIsInstance(res[0], tuple)
226        self.assertEqual(res[0], (1, 2))
227        self.assertEqual(res[0]._fields, ('one', 'two'))
228        self.assertIsInstance(res[1], tuple)
229        self.assertEqual(res[1], (3, 4))
230        self.assertEqual(res[1]._fields, ('one', 'two'))
231
232    def test_cursor_with_unnamed_columns(self):
233        con = self._connect()
234        cur = con.cursor()
235        cur.execute("select 1, 2, 3")
236        res = cur.fetchone()
237        self.assertIsInstance(res, tuple)
238        self.assertEqual(res, (1, 2, 3))
239        old_py = OrderedDict is None  # Python 2.6 or 3.0
240        # old Python versions cannot rename tuple fields with underscore
241        if old_py:
242            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
243        else:
244            self.assertEqual(res._fields, ('_0', '_1', '_2'))
245        cur.execute("select 1 as one, 2, 3 as three")
246        res = cur.fetchone()
247        self.assertIsInstance(res, tuple)
248        self.assertEqual(res, (1, 2, 3))
249        if old_py:  # cannot auto rename with underscore
250            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
251        else:
252            self.assertEqual(res._fields, ('one', '_1', 'three'))
253        cur.execute("select 1 as abc, 2 as def")
254        res = cur.fetchone()
255        self.assertIsInstance(res, tuple)
256        self.assertEqual(res, (1, 2))
257        if old_py:
258            self.assertEqual(res._fields, ('column_0', 'column_1'))
259        else:
260            self.assertEqual(res._fields, ('abc', '_1'))
261
262    def test_colnames(self):
263        con = self._connect()
264        cur = con.cursor()
265        cur.execute("select 1, 2, 3")
266        names = cur.colnames
267        self.assertIsInstance(names, list)
268        self.assertEqual(names, ['?column?', '?column?', '?column?'])
269        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
270        names = cur.colnames
271        self.assertIsInstance(names, list)
272        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
273
274    def test_coltypes(self):
275        con = self._connect()
276        cur = con.cursor()
277        cur.execute("select 1::int2, 2::int4, 3::int8")
278        types = cur.coltypes
279        self.assertIsInstance(types, list)
280        self.assertEqual(types, ['int2', 'int4', 'int8'])
281
282    def test_description_fields(self):
283        con = self._connect()
284        cur = con.cursor()
285        cur.execute("select 123456789::int8 col0,"
286            " 123456.789::numeric(41, 13) as col1,"
287            " 'foobar'::char(39) as col2")
288        desc = cur.description
289        self.assertIsInstance(desc, list)
290        self.assertEqual(len(desc), 3)
291        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
292        for i in range(3):
293            c, d = cols[i], desc[i]
294            self.assertIsInstance(d, tuple)
295            self.assertEqual(len(d), 7)
296            self.assertIsInstance(d.name, str)
297            self.assertEqual(d.name, 'col%d' % i)
298            self.assertIsInstance(d.type_code, str)
299            self.assertEqual(d.type_code, c[0])
300            self.assertIsNone(d.display_size)
301            self.assertIsInstance(d.internal_size, int)
302            self.assertEqual(d.internal_size, c[1])
303            if c[2] is not None:
304                self.assertIsInstance(d.precision, int)
305                self.assertEqual(d.precision, c[1])
306                self.assertIsInstance(d.scale, int)
307                self.assertEqual(d.scale, c[2])
308            else:
309                self.assertIsNone(d.precision)
310                self.assertIsNone(d.scale)
311            self.assertIsNone(d.null_ok)
312
313    def test_type_cache_info(self):
314        con = self._connect()
315        try:
316            cur = con.cursor()
317            type_cache = con.type_cache
318            self.assertNotIn('numeric', type_cache)
319            type_info = type_cache['numeric']
320            self.assertIn('numeric', type_cache)
321            self.assertEqual(type_info, 'numeric')
322            self.assertEqual(type_info.oid, 1700)
323            self.assertEqual(type_info.len, -1)
324            self.assertEqual(type_info.type, 'b')  # base
325            self.assertEqual(type_info.category, 'N')  # numeric
326            self.assertEqual(type_info.delim, ',')
327            self.assertEqual(type_info.relid, 0)
328            self.assertIs(con.type_cache[1700], type_info)
329            self.assertNotIn('pg_type', type_cache)
330            type_info = type_cache['pg_type']
331            self.assertIn('pg_type', type_cache)
332            self.assertEqual(type_info.type, 'c')  # composite
333            self.assertEqual(type_info.category, 'C')  # composite
334            cols = type_cache.get_fields('pg_type')
335            self.assertEqual(cols[0].name, 'typname')
336            typname = type_cache[cols[0].type]
337            self.assertEqual(typname, 'name')
338            self.assertEqual(typname.type, 'b')  # base
339            self.assertEqual(typname.category, 'S')  # string
340            self.assertEqual(cols[3].name, 'typlen')
341            typlen = type_cache[cols[3].type]
342            self.assertEqual(typlen, 'int2')
343            self.assertEqual(typlen.type, 'b')  # base
344            self.assertEqual(typlen.category, 'N')  # numeric
345            cur.close()
346            cur = con.cursor()
347            type_cache = con.type_cache
348            self.assertIn('numeric', type_cache)
349            cur.close()
350        finally:
351            con.close()
352        con = self._connect()
353        try:
354            cur = con.cursor()
355            type_cache = con.type_cache
356            self.assertNotIn('pg_type', type_cache)
357            self.assertEqual(type_cache.get('pg_type'), type_info)
358            self.assertIn('pg_type', type_cache)
359            self.assertIsNone(type_cache.get(
360                self.table_prefix + '_surely_does_not_exist'))
361            cur.close()
362        finally:
363            con.close()
364
365    def test_type_cache_typecast(self):
366        con = self._connect()
367        try:
368            cur = con.cursor()
369            type_cache = con.type_cache
370            self.assertIs(type_cache.get_typecast('int4'), int)
371            cast_int = lambda v: 'int(%s)' % v
372            type_cache.set_typecast('int4', cast_int)
373            query = 'select 2::int2, 4::int4, 8::int8'
374            cur.execute(query)
375            i2, i4, i8 = cur.fetchone()
376            self.assertEqual(i2, 2)
377            self.assertEqual(i4, 'int(4)')
378            self.assertEqual(i8, 8)
379            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
380            type_cache.set_typecast(['int2', 'int8'], cast_int)
381            cur.execute(query)
382            i2, i4, i8 = cur.fetchone()
383            self.assertEqual(i2, 'int(2)')
384            self.assertEqual(i4, 'int(4)')
385            self.assertEqual(i8, 'int(8)')
386            type_cache.reset_typecast('int4')
387            cur.execute(query)
388            i2, i4, i8 = cur.fetchone()
389            self.assertEqual(i2, 'int(2)')
390            self.assertEqual(i4, 4)
391            self.assertEqual(i8, 'int(8)')
392            type_cache.reset_typecast(['int2', 'int8'])
393            cur.execute(query)
394            i2, i4, i8 = cur.fetchone()
395            self.assertEqual(i2, 2)
396            self.assertEqual(i4, 4)
397            self.assertEqual(i8, 8)
398            type_cache.set_typecast(['int2', 'int8'], cast_int)
399            cur.execute(query)
400            i2, i4, i8 = cur.fetchone()
401            self.assertEqual(i2, 'int(2)')
402            self.assertEqual(i4, 4)
403            self.assertEqual(i8, 'int(8)')
404            type_cache.reset_typecast()
405            cur.execute(query)
406            i2, i4, i8 = cur.fetchone()
407            self.assertEqual(i2, 2)
408            self.assertEqual(i4, 4)
409            self.assertEqual(i8, 8)
410            cur.close()
411        finally:
412            con.close()
413
414    def test_cursor_iteration(self):
415        con = self._connect()
416        cur = con.cursor()
417        cur.execute("select 1 union select 2 union select 3")
418        self.assertEqual([r[0] for r in cur], [1, 2, 3])
419
420    def test_fetch_2_rows(self):
421        Decimal = pgdb.decimal_type()
422        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
423            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
424            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
425            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
426            pgdb.Interval(15, 31, 5), 7897234)
427        table = self.table_prefix + 'booze'
428        con = self._connect()
429        try:
430            cur = con.cursor()
431            cur.execute("set datestyle to iso")
432            cur.execute("create table %s ("
433                "stringtest varchar,"
434                "binarytest bytea,"
435                "booltest bool,"
436                "integertest int4,"
437                "longtest int8,"
438                "floattest float8,"
439                "numerictest numeric,"
440                "moneytest money,"
441                "datetest date,"
442                "timetest time,"
443                "datetimetest timestamp,"
444                "intervaltest interval,"
445                "rowidtest oid)" % table)
446            cur.execute("set standard_conforming_strings to on")
447            for s in ('numeric', 'monetary', 'time'):
448                cur.execute("set lc_%s to 'C'" % s)
449            for _i in range(2):
450                cur.execute("insert into %s values ("
451                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
452                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
453            cur.execute("select * from %s" % table)
454            rows = cur.fetchall()
455            self.assertEqual(len(rows), 2)
456            row0 = rows[0]
457            self.assertEqual(row0, values)
458            self.assertEqual(row0, rows[1])
459            self.assertIsInstance(row0[0], str)
460            self.assertIsInstance(row0[1], bytes)
461            self.assertIsInstance(row0[2], bool)
462            self.assertIsInstance(row0[3], int)
463            self.assertIsInstance(row0[4], long)
464            self.assertIsInstance(row0[5], float)
465            self.assertIsInstance(row0[6], Decimal)
466            self.assertIsInstance(row0[7], Decimal)
467            self.assertIsInstance(row0[8], date)
468            self.assertIsInstance(row0[9], time)
469            self.assertIsInstance(row0[10], datetime)
470            self.assertIsInstance(row0[11], timedelta)
471        finally:
472            con.close()
473
474    def test_integrity_error(self):
475        table = self.table_prefix + 'booze'
476        con = self._connect()
477        try:
478            cur = con.cursor()
479            cur.execute("set client_min_messages = warning")
480            cur.execute("create table %s (i int primary key)" % table)
481            cur.execute("insert into %s values (1)" % table)
482            cur.execute("insert into %s values (2)" % table)
483            self.assertRaises(pgdb.IntegrityError, cur.execute,
484                "insert into %s values (1)" % table)
485        finally:
486            con.close()
487
488    def test_sqlstate(self):
489        con = self._connect()
490        cur = con.cursor()
491        try:
492            cur.execute("select 1/0")
493        except pgdb.DatabaseError as error:
494            self.assertTrue(isinstance(error, pgdb.DataError))
495            # the SQLSTATE error code for division by zero is 22012
496            self.assertEqual(error.sqlstate, '22012')
497
498    def test_float(self):
499        nan, inf = float('nan'), float('inf')
500        from math import isnan, isinf
501        self.assertTrue(isnan(nan) and not isinf(nan))
502        self.assertTrue(isinf(inf) and not isnan(inf))
503        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
504            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
505        table = self.table_prefix + 'booze'
506        con = self._connect()
507        try:
508            cur = con.cursor()
509            cur.execute(
510                "create table %s (n smallint, floattest float)" % table)
511            params = enumerate(values)
512            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
513            cur.execute("select floattest from %s order by n" % table)
514            rows = cur.fetchall()
515            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
516            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
517            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
518        finally:
519            con.close()
520        self.assertEqual(len(rows), len(values))
521        rows = [row[0] for row in rows]
522        for inval, outval in zip(values, rows):
523            if inval in ('inf', 'Infinity'):
524                inval = inf
525            elif inval in ('-inf', '-Infinity'):
526                inval = -inf
527            elif inval in ('nan', 'NaN'):
528                inval = nan
529            if isinf(inval):
530                self.assertTrue(isinf(outval))
531                if inval < 0:
532                    self.assertTrue(outval < 0)
533                else:
534                    self.assertTrue(outval > 0)
535            elif isnan(inval):
536                self.assertTrue(isnan(outval))
537            else:
538                self.assertEqual(inval, outval)
539
540    def test_datetime(self):
541        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
542        table = self.table_prefix + 'booze'
543        con = self._connect()
544        try:
545            cur = con.cursor()
546            cur.execute("create table %s ("
547                "d date, t time,  ts timestamp,"
548                "tz timetz, tsz timestamptz)" % table)
549            for n in range(3):
550                values = [dt.date(), dt.time(), dt,
551                    dt.time(), dt]
552                if timezone:
553                    values[3] = values[3].replace(tzinfo=timezone.utc)
554                    values[4] = values[4].replace(tzinfo=timezone.utc)
555                if n == 0:  # input as objects
556                    params = values
557                if n == 1:  # input as text
558                    params = [v.isoformat() for v in values]  # as text
559                elif n == 2:  # input using type helpers
560                    d = (dt.year, dt.month, dt.day)
561                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
562                    params = [pgdb.Date(*d), pgdb.Time(*t),
563                            pgdb.Timestamp(*(d + t)), pgdb.Time(*t),
564                            pgdb.Timestamp(*(d + t))]
565                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
566                        'sql, mdy', 'sql, dmy', 'german'):
567                    cur.execute("set datestyle to %s" % datestyle)
568                    if n != 1:
569                        cur.execute("select %s,%s,%s,%s,%s", params)
570                        row = cur.fetchone()
571                        self.assertEqual(row, tuple(values))
572                    cur.execute("insert into %s"
573                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
574                    cur.execute("select * from %s" % table)
575                    d = cur.description
576                    for i in range(5):
577                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
578                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
579                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
580                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
581                    self.assertEqual(d[0].type_code, pgdb.DATE)
582                    self.assertEqual(d[1].type_code, pgdb.TIME)
583                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
584                    self.assertEqual(d[3].type_code, pgdb.TIME)
585                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
586                    row = cur.fetchone()
587                    self.assertEqual(row, tuple(values))
588                    cur.execute("delete from %s" % table)
589        finally:
590            con.close()
591
592    def test_interval(self):
593        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
594        table = self.table_prefix + 'booze'
595        con = self._connect()
596        try:
597            cur = con.cursor()
598            cur.execute("create table %s (i interval)" % table)
599            for n in range(3):
600                if n == 0:  # input as objects
601                    param = td
602                if n == 1:  # input as text
603                    param = '%d days %d seconds %d microseconds ' % (
604                        td.days, td.seconds, td.microseconds)
605                elif n == 2:  # input using type helpers
606                    param = pgdb.Interval(
607                        td.days, 0, 0, td.seconds, td.microseconds)
608                for intervalstyle in ('sql_standard ', 'postgres',
609                        'postgres_verbose', 'iso_8601'):
610                    cur.execute("set intervalstyle to %s" % intervalstyle)
611                    cur.execute("insert into %s"
612                        " values (%%s)" % table, [param])
613                    cur.execute("select * from %s" % table)
614                    tc = cur.description[0].type_code
615                    self.assertEqual(tc, pgdb.DATETIME)
616                    self.assertNotEqual(tc, pgdb.STRING)
617                    self.assertNotEqual(tc, pgdb.ARRAY)
618                    self.assertNotEqual(tc, pgdb.RECORD)
619                    self.assertEqual(tc, pgdb.INTERVAL)
620                    row = cur.fetchone()
621                    self.assertEqual(row, (td,))
622                    cur.execute("delete from %s" % table)
623        finally:
624            con.close()
625
626    def test_hstore(self):
627        con = self._connect()
628        try:
629            cur = con.cursor()
630            cur.execute("select 'k=>v'::hstore")
631        except pgdb.DatabaseError:
632            try:
633                cur.execute("create extension hstore")
634            except pgdb.DatabaseError:
635                self.skipTest("hstore extension not enabled")
636        finally:
637            con.close()
638        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever', 'back\\': '\\slash',
639            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
640            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
641            'None': None, 'NULL': 'NULL', 'empty': ''}
642        con = self._connect()
643        try:
644            cur = con.cursor()
645            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
646            result = cur.fetchone()[0]
647        finally:
648            con.close()
649        self.assertIsInstance(result, dict)
650        self.assertEqual(result, d)
651
652    def test_uuid(self):
653        self.assertIs(Uuid, pgdb.Uuid)
654        d = Uuid('{12345678-1234-5678-1234-567812345678}')
655        con = self._connect()
656        try:
657            cur = con.cursor()
658            cur.execute("select %s::uuid", (d,))
659            result = cur.fetchone()[0]
660        finally:
661            con.close()
662        self.assertIsInstance(result, Uuid)
663        self.assertEqual(result, d)
664
665    def test_insert_array(self):
666        values = [(None, None), ([], []), ([None], [[None], ['null']]),
667            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
668            ([20000, 25000, 25000, 30000],
669            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
670            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
671        table = self.table_prefix + 'booze'
672        con = self._connect()
673        try:
674            cur = con.cursor()
675            cur.execute("create table %s"
676                " (n smallint, i int[], t text[][])" % table)
677            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
678            # Note that we must explicit casts because we are inserting
679            # empty arrays.  Otherwise this is not necessary.
680            cur.executemany("insert into %s values"
681                " (%%d,%%s::int[],%%s::text[][])" % table, params)
682            cur.execute("select i, t from %s order by n" % table)
683            d = cur.description
684            self.assertEqual(d[0].type_code, pgdb.ARRAY)
685            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
686            self.assertEqual(d[0].type_code, pgdb.NUMBER)
687            self.assertEqual(d[0].type_code, pgdb.INTEGER)
688            self.assertEqual(d[1].type_code, pgdb.ARRAY)
689            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
690            self.assertEqual(d[1].type_code, pgdb.STRING)
691            rows = cur.fetchall()
692        finally:
693            con.close()
694        self.assertEqual(rows, values)
695
696    def test_select_array(self):
697        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
698        con = self._connect()
699        try:
700            cur = con.cursor()
701            cur.execute("select %s::int[], %s::text[]", values)
702            row = cur.fetchone()
703        finally:
704            con.close()
705        self.assertEqual(row, values)
706
707    def test_insert_record(self):
708        values = [('John', 61), ('Jane', 63),
709                  ('Fred', None), ('Wilma', None),
710                  (None, 42), (None, None)]
711        table = self.table_prefix + 'booze'
712        record = self.table_prefix + 'munch'
713        con = self._connect()
714        try:
715            cur = con.cursor()
716            cur.execute("create type %s as (name varchar, age int)" % record)
717            cur.execute("create table %s (n smallint, r %s)" % (table, record))
718            params = enumerate(values)
719            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
720            cur.execute("select r from %s order by n" % table)
721            type_code = cur.description[0].type_code
722            self.assertEqual(type_code, record)
723            self.assertEqual(type_code, pgdb.RECORD)
724            self.assertNotEqual(type_code, pgdb.ARRAY)
725            columns = con.type_cache.get_fields(type_code)
726            self.assertEqual(columns[0].name, 'name')
727            self.assertEqual(columns[1].name, 'age')
728            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
729            self.assertEqual(con.type_cache[columns[1].type], 'int4')
730            rows = cur.fetchall()
731        finally:
732            cur.execute('drop table %s' % table)
733            cur.execute('drop type %s' % record)
734            con.close()
735        self.assertEqual(len(rows), len(values))
736        rows = [row[0] for row in rows]
737        self.assertEqual(rows, values)
738        self.assertEqual(rows[0].name, 'John')
739        self.assertEqual(rows[0].age, 61)
740
741    def test_select_record(self):
742        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
743            '(test)', '(x,y)', ' x y ', 'null', None)
744        con = self._connect()
745        try:
746            cur = con.cursor()
747            cur.execute("select %s as test_record", [value])
748            self.assertEqual(cur.description[0].name, 'test_record')
749            self.assertEqual(cur.description[0].type_code, 'record')
750            row = cur.fetchone()[0]
751        finally:
752            con.close()
753        # Note that the element types get lost since we created an
754        # untyped record (an anonymous composite type). For the same
755        # reason this is also a normal tuple, not a named tuple.
756        text_row = tuple(None if v is None else str(v) for v in value)
757        self.assertEqual(row, text_row)
758
759    def test_custom_type(self):
760        values = [3, 5, 65]
761        values = list(map(PgBitString, values))
762        table = self.table_prefix + 'booze'
763        con = self._connect()
764        try:
765            cur = con.cursor()
766            params = enumerate(values)  # params have __pg_repr__ method
767            cur.execute(
768                'create table "%s" (n smallint, b bit varying(7))' % table)
769            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
770            cur.execute("select * from %s" % table)
771            rows = cur.fetchall()
772        finally:
773            con.close()
774        self.assertEqual(len(rows), len(values))
775        con = self._connect()
776        try:
777            cur = con.cursor()
778            params = (1, object())  # an object that cannot be handled
779            self.assertRaises(pgdb.InterfaceError, cur.execute,
780                "insert into %s values (%%s,%%s)" % table, params)
781        finally:
782            con.close()
783
784    def test_set_decimal_type(self):
785        decimal_type = pgdb.decimal_type()
786        self.assertTrue(decimal_type is not None and callable(decimal_type))
787        con = self._connect()
788        try:
789            cur = con.cursor()
790            # change decimal type globally to int
791            int_type = lambda v: int(float(v))
792            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
793            cur.execute('select 4.25')
794            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
795            value = cur.fetchone()[0]
796            self.assertTrue(isinstance(value, int))
797            self.assertEqual(value, 4)
798            # change decimal type again to float
799            self.assertTrue(pgdb.decimal_type(float) is float)
800            cur.execute('select 4.25')
801            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
802            value = cur.fetchone()[0]
803            # the connection still uses the old setting
804            self.assertTrue(isinstance(value, int))
805            # bust the cache for type functions for the connection
806            con.type_cache.reset_typecast()
807            cur.execute('select 4.25')
808            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
809            value = cur.fetchone()[0]
810            # now the connection uses the new setting
811            self.assertTrue(isinstance(value, float))
812            self.assertEqual(value, 4.25)
813        finally:
814            con.close()
815            pgdb.decimal_type(decimal_type)
816        self.assertTrue(pgdb.decimal_type() is decimal_type)
817
818    def test_global_typecast(self):
819        try:
820            query = 'select 2::int2, 4::int4, 8::int8'
821            self.assertIs(pgdb.get_typecast('int4'), int)
822            cast_int = lambda v: 'int(%s)' % v
823            pgdb.set_typecast('int4', cast_int)
824            con = self._connect()
825            try:
826                i2, i4, i8 = con.cursor().execute(query).fetchone()
827            finally:
828                con.close()
829            self.assertEqual(i2, 2)
830            self.assertEqual(i4, 'int(4)')
831            self.assertEqual(i8, 8)
832            pgdb.set_typecast(['int2', 'int8'], cast_int)
833            con = self._connect()
834            try:
835                i2, i4, i8 = con.cursor().execute(query).fetchone()
836            finally:
837                con.close()
838            self.assertEqual(i2, 'int(2)')
839            self.assertEqual(i4, 'int(4)')
840            self.assertEqual(i8, 'int(8)')
841            pgdb.reset_typecast('int4')
842            con = self._connect()
843            try:
844                i2, i4, i8 = con.cursor().execute(query).fetchone()
845            finally:
846                con.close()
847            self.assertEqual(i2, 'int(2)')
848            self.assertEqual(i4, 4)
849            self.assertEqual(i8, 'int(8)')
850            pgdb.reset_typecast(['int2', 'int8'])
851            con = self._connect()
852            try:
853                i2, i4, i8 = con.cursor().execute(query).fetchone()
854            finally:
855                con.close()
856            self.assertEqual(i2, 2)
857            self.assertEqual(i4, 4)
858            self.assertEqual(i8, 8)
859            pgdb.set_typecast(['int2', 'int8'], cast_int)
860            con = self._connect()
861            try:
862                i2, i4, i8 = con.cursor().execute(query).fetchone()
863            finally:
864                con.close()
865            self.assertEqual(i2, 'int(2)')
866            self.assertEqual(i4, 4)
867            self.assertEqual(i8, 'int(8)')
868        finally:
869            pgdb.reset_typecast()
870        con = self._connect()
871        try:
872            i2, i4, i8 = con.cursor().execute(query).fetchone()
873        finally:
874            con.close()
875        self.assertEqual(i2, 2)
876        self.assertEqual(i4, 4)
877        self.assertEqual(i8, 8)
878
879    def test_unicode_with_utf8(self):
880        table = self.table_prefix + 'booze'
881        input = u"He wes Leovenaðes sone — liðe him be Drihten"
882        con = self._connect()
883        try:
884            cur = con.cursor()
885            cur.execute("create table %s (t text)" % table)
886            try:
887                cur.execute("set client_encoding=utf8")
888                cur.execute(u"select '%s'" % input)
889            except Exception:
890                self.skipTest("database does not support utf8")
891            output1 = cur.fetchone()[0]
892            cur.execute("insert into %s values (%%s)" % table, (input,))
893            cur.execute("select * from %s" % table)
894            output2 = cur.fetchone()[0]
895            cur.execute("select t = '%s' from %s" % (input, table))
896            output3 = cur.fetchone()[0]
897            cur.execute("select t = %%s from %s" % table, (input,))
898            output4 = cur.fetchone()[0]
899        finally:
900            con.close()
901        if str is bytes:  # Python < 3.0
902            input = input.encode('utf8')
903        self.assertIsInstance(output1, str)
904        self.assertEqual(output1, input)
905        self.assertIsInstance(output2, str)
906        self.assertEqual(output2, input)
907        self.assertIsInstance(output3, bool)
908        self.assertTrue(output3)
909        self.assertIsInstance(output4, bool)
910        self.assertTrue(output4)
911
912    def test_unicode_with_latin1(self):
913        table = self.table_prefix + 'booze'
914        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
915        con = self._connect()
916        try:
917            cur = con.cursor()
918            cur.execute("create table %s (t text)" % table)
919            try:
920                cur.execute("set client_encoding=latin1")
921                cur.execute(u"select '%s'" % input)
922            except Exception:
923                self.skipTest("database does not support latin1")
924            output1 = cur.fetchone()[0]
925            cur.execute("insert into %s values (%%s)" % table, (input,))
926            cur.execute("select * from %s" % table)
927            output2 = cur.fetchone()[0]
928            cur.execute("select t = '%s' from %s" % (input, table))
929            output3 = cur.fetchone()[0]
930            cur.execute("select t = %%s from %s" % table, (input,))
931            output4 = cur.fetchone()[0]
932        finally:
933            con.close()
934        if str is bytes:  # Python < 3.0
935            input = input.encode('latin1')
936        self.assertIsInstance(output1, str)
937        self.assertEqual(output1, input)
938        self.assertIsInstance(output2, str)
939        self.assertEqual(output2, input)
940        self.assertIsInstance(output3, bool)
941        self.assertTrue(output3)
942        self.assertIsInstance(output4, bool)
943        self.assertTrue(output4)
944
945    def test_bool(self):
946        values = [False, True, None, 't', 'f', 'true', 'false']
947        table = self.table_prefix + 'booze'
948        con = self._connect()
949        try:
950            cur = con.cursor()
951            cur.execute(
952                "create table %s (n smallint, booltest bool)" % table)
953            params = enumerate(values)
954            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
955            cur.execute("select booltest from %s order by n" % table)
956            rows = cur.fetchall()
957            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
958        finally:
959            con.close()
960        rows = [row[0] for row in rows]
961        values[3] = values[5] = True
962        values[4] = values[6] = False
963        self.assertEqual(rows, values)
964
965    def test_literal(self):
966        con = self._connect()
967        try:
968            cur = con.cursor()
969            value = "lower('Hello')"
970            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
971            row = cur.fetchone()
972        finally:
973            con.close()
974        self.assertEqual(row, (value, 'hello'))
975
976
977    def test_json(self):
978        inval = {"employees":
979            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
980        table = self.table_prefix + 'booze'
981        con = self._connect()
982        try:
983            cur = con.cursor()
984            try:
985                cur.execute("create table %s (jsontest json)" % table)
986            except pgdb.ProgrammingError:
987                self.skipTest('database does not support json')
988            params = (pgdb.Json(inval),)
989            cur.execute("insert into %s values (%%s)" % table, params)
990            cur.execute("select jsontest from %s" % table)
991            outval = cur.fetchone()[0]
992            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
993        finally:
994            con.close()
995        self.assertEqual(inval, outval)
996
997    def test_jsonb(self):
998        inval = {"employees":
999            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
1000        table = self.table_prefix + 'booze'
1001        con = self._connect()
1002        try:
1003            cur = con.cursor()
1004            try:
1005                cur.execute("create table %s (jsonbtest jsonb)" % table)
1006            except pgdb.ProgrammingError:
1007                self.skipTest('database does not support jsonb')
1008            params = (pgdb.Json(inval),)
1009            cur.execute("insert into %s values (%%s)" % table, params)
1010            cur.execute("select jsonbtest from %s" % table)
1011            outval = cur.fetchone()[0]
1012            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
1013        finally:
1014            con.close()
1015        self.assertEqual(inval, outval)
1016
1017    def test_execute_edge_cases(self):
1018        con = self._connect()
1019        try:
1020            cur = con.cursor()
1021            sql = 'invalid'  # should be ignored with empty parameter list
1022            cur.executemany(sql, [])
1023            sql = 'select %d + 1'
1024            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
1025            self.assertEqual(cur.fetchone()[0], 3)
1026            sql = 'select 1/0'  # cannot be executed
1027            self.assertRaises(pgdb.DataError, cur.execute, sql)
1028            cur.close()
1029            con.rollback()
1030            if pgdb.shortcutmethods:
1031                res = con.execute('select %d', (1,)).fetchone()
1032                self.assertEqual(res, (1,))
1033                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1034                self.assertEqual(res, (2,))
1035        finally:
1036            con.close()
1037        sql = 'select 1'  # cannot be executed after connection is closed
1038        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1039
1040    def test_fetchmany_with_keep(self):
1041        con = self._connect()
1042        try:
1043            cur = con.cursor()
1044            self.assertEqual(cur.arraysize, 1)
1045            cur.execute('select * from generate_series(1, 25)')
1046            self.assertEqual(len(cur.fetchmany()), 1)
1047            self.assertEqual(len(cur.fetchmany()), 1)
1048            self.assertEqual(cur.arraysize, 1)
1049            cur.arraysize = 3
1050            self.assertEqual(len(cur.fetchmany()), 3)
1051            self.assertEqual(len(cur.fetchmany()), 3)
1052            self.assertEqual(cur.arraysize, 3)
1053            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1054            self.assertEqual(cur.arraysize, 3)
1055            self.assertEqual(len(cur.fetchmany()), 3)
1056            self.assertEqual(len(cur.fetchmany()), 3)
1057            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1058            self.assertEqual(cur.arraysize, 2)
1059            self.assertEqual(len(cur.fetchmany()), 2)
1060            self.assertEqual(len(cur.fetchmany()), 2)
1061            self.assertEqual(len(cur.fetchmany(25)), 3)
1062        finally:
1063            con.close()
1064
1065    def test_nextset(self):
1066        con = self._connect()
1067        cur = con.cursor()
1068        self.assertRaises(con.NotSupportedError, cur.nextset)
1069
1070    def test_setoutputsize(self):
1071        pass  # not supported
1072
1073    def test_connection_errors(self):
1074        con = self._connect()
1075        self.assertEqual(con.Error, pgdb.Error)
1076        self.assertEqual(con.Warning, pgdb.Warning)
1077        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1078        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1079        self.assertEqual(con.InternalError, pgdb.InternalError)
1080        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1081        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1082        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1083        self.assertEqual(con.DataError, pgdb.DataError)
1084        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1085
1086    def test_connection_as_contextmanager(self):
1087        table = self.table_prefix + 'booze'
1088        con = self._connect()
1089        try:
1090            cur = con.cursor()
1091            cur.execute("create table %s (n smallint check(n!=4))" % table)
1092            with con:
1093                cur.execute("insert into %s values (1)" % table)
1094                cur.execute("insert into %s values (2)" % table)
1095            try:
1096                with con:
1097                    cur.execute("insert into %s values (3)" % table)
1098                    cur.execute("insert into %s values (4)" % table)
1099            except con.IntegrityError as error:
1100                self.assertTrue('check' in str(error).lower())
1101            with con:
1102                cur.execute("insert into %s values (5)" % table)
1103                cur.execute("insert into %s values (6)" % table)
1104            try:
1105                with con:
1106                    cur.execute("insert into %s values (7)" % table)
1107                    cur.execute("insert into %s values (8)" % table)
1108                    raise ValueError('transaction should rollback')
1109            except ValueError as error:
1110                self.assertEqual(str(error), 'transaction should rollback')
1111            with con:
1112                cur.execute("insert into %s values (9)" % table)
1113            cur.execute("select * from %s order by 1" % table)
1114            rows = cur.fetchall()
1115            rows = [row[0] for row in rows]
1116        finally:
1117            con.close()
1118        self.assertEqual(rows, [1, 2, 5, 6, 9])
1119
1120    def test_cursor_connection(self):
1121        con = self._connect()
1122        cur = con.cursor()
1123        self.assertEqual(cur.connection, con)
1124        cur.close()
1125
1126    def test_cursor_as_contextmanager(self):
1127        con = self._connect()
1128        with con.cursor() as cur:
1129            self.assertEqual(cur.connection, con)
1130
1131    def test_pgdb_type(self):
1132        self.assertEqual(pgdb.STRING, pgdb.STRING)
1133        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1134        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1135        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1136        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1137        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1138        self.assertEqual('char', pgdb.STRING)
1139        self.assertEqual('varchar', pgdb.STRING)
1140        self.assertEqual('text', pgdb.STRING)
1141        self.assertNotEqual('numeric', pgdb.STRING)
1142        self.assertEqual('numeric', pgdb.NUMERIC)
1143        self.assertEqual('numeric', pgdb.NUMBER)
1144        self.assertEqual('int4', pgdb.NUMBER)
1145        self.assertNotEqual('int4', pgdb.NUMERIC)
1146        self.assertEqual('int2', pgdb.SMALLINT)
1147        self.assertNotEqual('int4', pgdb.SMALLINT)
1148        self.assertEqual('int2', pgdb.INTEGER)
1149        self.assertEqual('int4', pgdb.INTEGER)
1150        self.assertEqual('int8', pgdb.INTEGER)
1151        self.assertNotEqual('int4', pgdb.LONG)
1152        self.assertEqual('int8', pgdb.LONG)
1153        self.assertTrue('char' in pgdb.STRING)
1154        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1155        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1156        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1157        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1158        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1159        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1160        self.assertEqual('_char', pgdb.ARRAY)
1161        self.assertNotEqual('char', pgdb.ARRAY)
1162        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1163        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1164        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1165        self.assertEqual('record', pgdb.RECORD)
1166        self.assertNotEqual('_record', pgdb.RECORD)
1167
1168
1169if __name__ == '__main__':
1170    unittest.main()
Note: See TracBrowser for help on using the repository browser.