source: trunk/tests/test_dbapi20.py @ 822

Last change on this file since 822 was 822, checked in by cito, 4 years ago

Make version available as version in both modules

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 44.8 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 822 2016-02-05 17:05:45Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import date, time, datetime, timedelta
32from uuid import UUID
33
34try:
35    from datetime import timezone
36except ImportError:  # Python < 3.2
37    timezone = None
38
39try:
40    long
41except NameError:  # Python >= 3.0
42    long = int
43
44try:
45    from collections import OrderedDict
46except ImportError:  # Python 2.6 or 3.0
47    OrderedDict = None
48
49
50class PgBitString:
51    """Test object with a PostgreSQL representation as Bit String."""
52
53    def __init__(self, value):
54        self.value = value
55
56    def __pg_repr__(self):
57         return "B'{0:b}'".format(self.value)
58
59
60class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
61
62    driver = pgdb
63    connect_args = ()
64    connect_kw_args = {'database': dbname,
65        'host': '%s:%d' % (dbhost or '', dbport or -1)}
66
67    lower_func = 'lower'  # For stored procedure test
68
69    def setUp(self):
70        # Call superclass setUp in case this does something in the future
71        dbapi20.DatabaseAPI20Test.setUp(self)
72        try:
73            con = self._connect()
74            con.close()
75        except pgdb.Error:  # try to create a missing database
76            import pg
77            try:  # first try to log in as superuser
78                db = pg.DB('postgres', dbhost or None, dbport or -1,
79                    user='postgres')
80            except Exception:  # then try to log in as current user
81                db = pg.DB('postgres', dbhost or None, dbport or -1)
82            db.query('create database ' + dbname)
83
84    def tearDown(self):
85        dbapi20.DatabaseAPI20Test.tearDown(self)
86
87    def testVersion(self):
88        v = pgdb.version
89        self.assertIsInstance(v, str)
90        self.assertIn('.', v)
91        self.assertEqual(pgdb.__version__, v)
92
93    def test_callproc_no_params(self):
94        con = self._connect()
95        cur = con.cursor()
96        # note that now() does not change within a transaction
97        cur.execute('select now()')
98        now = cur.fetchone()[0]
99        res = cur.callproc('now')
100        self.assertIsNone(res)
101        res = cur.fetchone()[0]
102        self.assertEqual(res, now)
103
104    def test_callproc_bad_params(self):
105        con = self._connect()
106        cur = con.cursor()
107        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
108        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
109
110    def test_callproc_one_param(self):
111        con = self._connect()
112        cur = con.cursor()
113        params = (42.4382,)
114        res = cur.callproc("round", params)
115        self.assertIs(res, params)
116        res = cur.fetchone()[0]
117        self.assertEqual(res, 42)
118
119    def test_callproc_two_params(self):
120        con = self._connect()
121        cur = con.cursor()
122        params = (9, 4)
123        res = cur.callproc("div", params)
124        self.assertIs(res, params)
125        res = cur.fetchone()[0]
126        self.assertEqual(res, 2)
127
128    def test_cursor_type(self):
129
130        class TestCursor(pgdb.Cursor):
131            pass
132
133        con = self._connect()
134        self.assertIs(con.cursor_type, pgdb.Cursor)
135        cur = con.cursor()
136        self.assertIsInstance(cur, pgdb.Cursor)
137        self.assertNotIsInstance(cur, TestCursor)
138        con.cursor_type = TestCursor
139        cur = con.cursor()
140        self.assertIsInstance(cur, TestCursor)
141        cur = con.cursor()
142        self.assertIsInstance(cur, TestCursor)
143        con = self._connect()
144        self.assertIs(con.cursor_type, pgdb.Cursor)
145        cur = con.cursor()
146        self.assertIsInstance(cur, pgdb.Cursor)
147        self.assertNotIsInstance(cur, TestCursor)
148
149    def test_row_factory(self):
150
151        class TestCursor(pgdb.Cursor):
152
153            def row_factory(self, row):
154                return dict(('column %s' % desc[0], value)
155                    for desc, value in zip(self.description, row))
156
157        con = self._connect()
158        con.cursor_type = TestCursor
159        cur = con.cursor()
160        self.assertIsInstance(cur, TestCursor)
161        res = cur.execute("select 1 as a, 2 as b")
162        self.assertIs(res, cur, 'execute() should return cursor')
163        res = cur.fetchone()
164        self.assertIsInstance(res, dict)
165        self.assertEqual(res, {'column a': 1, 'column b': 2})
166        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
167        res = cur.fetchall()
168        self.assertIsInstance(res, list)
169        self.assertEqual(len(res), 2)
170        self.assertIsInstance(res[0], dict)
171        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
172        self.assertIsInstance(res[1], dict)
173        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
174
175    def test_build_row_factory(self):
176
177        class TestCursor(pgdb.Cursor):
178
179            def build_row_factory(self):
180                keys = [desc[0] for desc in self.description]
181                return lambda row: dict((key, value)
182                    for key, value in zip(keys, row))
183
184        con = self._connect()
185        con.cursor_type = TestCursor
186        cur = con.cursor()
187        self.assertIsInstance(cur, TestCursor)
188        cur.execute("select 1 as a, 2 as b")
189        res = cur.fetchone()
190        self.assertIsInstance(res, dict)
191        self.assertEqual(res, {'a': 1, 'b': 2})
192        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
193        res = cur.fetchall()
194        self.assertIsInstance(res, list)
195        self.assertEqual(len(res), 2)
196        self.assertIsInstance(res[0], dict)
197        self.assertEqual(res[0], {'a': 1, 'b': 2})
198        self.assertIsInstance(res[1], dict)
199        self.assertEqual(res[1], {'a': 3, 'b': 4})
200
201    def test_cursor_with_named_columns(self):
202        con = self._connect()
203        cur = con.cursor()
204        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
205        self.assertIs(res, cur, 'execute() should return cursor')
206        res = cur.fetchone()
207        self.assertIsInstance(res, tuple)
208        self.assertEqual(res, (1, 2, 3))
209        self.assertEqual(res._fields, ('abc', 'de', 'f'))
210        self.assertEqual(res.abc, 1)
211        self.assertEqual(res.de, 2)
212        self.assertEqual(res.f, 3)
213        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
214        res = cur.fetchall()
215        self.assertIsInstance(res, list)
216        self.assertEqual(len(res), 2)
217        self.assertIsInstance(res[0], tuple)
218        self.assertEqual(res[0], (1, 2))
219        self.assertEqual(res[0]._fields, ('one', 'two'))
220        self.assertIsInstance(res[1], tuple)
221        self.assertEqual(res[1], (3, 4))
222        self.assertEqual(res[1]._fields, ('one', 'two'))
223
224    def test_cursor_with_unnamed_columns(self):
225        con = self._connect()
226        cur = con.cursor()
227        cur.execute("select 1, 2, 3")
228        res = cur.fetchone()
229        self.assertIsInstance(res, tuple)
230        self.assertEqual(res, (1, 2, 3))
231        old_py = OrderedDict is None  # Python 2.6 or 3.0
232        # old Python versions cannot rename tuple fields with underscore
233        if old_py:
234            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
235        else:
236            self.assertEqual(res._fields, ('_0', '_1', '_2'))
237        cur.execute("select 1 as one, 2, 3 as three")
238        res = cur.fetchone()
239        self.assertIsInstance(res, tuple)
240        self.assertEqual(res, (1, 2, 3))
241        if old_py:  # cannot auto rename with underscore
242            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
243        else:
244            self.assertEqual(res._fields, ('one', '_1', 'three'))
245        cur.execute("select 1 as abc, 2 as def")
246        res = cur.fetchone()
247        self.assertIsInstance(res, tuple)
248        self.assertEqual(res, (1, 2))
249        if old_py:
250            self.assertEqual(res._fields, ('column_0', 'column_1'))
251        else:
252            self.assertEqual(res._fields, ('abc', '_1'))
253
254    def test_colnames(self):
255        con = self._connect()
256        cur = con.cursor()
257        cur.execute("select 1, 2, 3")
258        names = cur.colnames
259        self.assertIsInstance(names, list)
260        self.assertEqual(names, ['?column?', '?column?', '?column?'])
261        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
262        names = cur.colnames
263        self.assertIsInstance(names, list)
264        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
265
266    def test_coltypes(self):
267        con = self._connect()
268        cur = con.cursor()
269        cur.execute("select 1::int2, 2::int4, 3::int8")
270        types = cur.coltypes
271        self.assertIsInstance(types, list)
272        self.assertEqual(types, ['int2', 'int4', 'int8'])
273
274    def test_description_fields(self):
275        con = self._connect()
276        cur = con.cursor()
277        cur.execute("select 123456789::int8 col0,"
278            " 123456.789::numeric(41, 13) as col1,"
279            " 'foobar'::char(39) as col2")
280        desc = cur.description
281        self.assertIsInstance(desc, list)
282        self.assertEqual(len(desc), 3)
283        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
284        for i in range(3):
285            c, d = cols[i], desc[i]
286            self.assertIsInstance(d, tuple)
287            self.assertEqual(len(d), 7)
288            self.assertIsInstance(d.name, str)
289            self.assertEqual(d.name, 'col%d' % i)
290            self.assertIsInstance(d.type_code, str)
291            self.assertEqual(d.type_code, c[0])
292            self.assertIsNone(d.display_size)
293            self.assertIsInstance(d.internal_size, int)
294            self.assertEqual(d.internal_size, c[1])
295            if c[2] is not None:
296                self.assertIsInstance(d.precision, int)
297                self.assertEqual(d.precision, c[1])
298                self.assertIsInstance(d.scale, int)
299                self.assertEqual(d.scale, c[2])
300            else:
301                self.assertIsNone(d.precision)
302                self.assertIsNone(d.scale)
303            self.assertIsNone(d.null_ok)
304
305    def test_type_cache_info(self):
306        con = self._connect()
307        try:
308            cur = con.cursor()
309            type_cache = con.type_cache
310            self.assertNotIn('numeric', type_cache)
311            type_info = type_cache['numeric']
312            self.assertIn('numeric', type_cache)
313            self.assertEqual(type_info, 'numeric')
314            self.assertEqual(type_info.oid, 1700)
315            self.assertEqual(type_info.len, -1)
316            self.assertEqual(type_info.type, 'b')  # base
317            self.assertEqual(type_info.category, 'N')  # numeric
318            self.assertEqual(type_info.delim, ',')
319            self.assertEqual(type_info.relid, 0)
320            self.assertIs(con.type_cache[1700], type_info)
321            self.assertNotIn('pg_type', type_cache)
322            type_info = type_cache['pg_type']
323            self.assertIn('pg_type', type_cache)
324            self.assertEqual(type_info.type, 'c')  # composite
325            self.assertEqual(type_info.category, 'C')  # composite
326            cols = type_cache.get_fields('pg_type')
327            self.assertEqual(cols[0].name, 'typname')
328            typname = type_cache[cols[0].type]
329            self.assertEqual(typname, 'name')
330            self.assertEqual(typname.type, 'b')  # base
331            self.assertEqual(typname.category, 'S')  # string
332            self.assertEqual(cols[3].name, 'typlen')
333            typlen = type_cache[cols[3].type]
334            self.assertEqual(typlen, 'int2')
335            self.assertEqual(typlen.type, 'b')  # base
336            self.assertEqual(typlen.category, 'N')  # numeric
337            cur.close()
338            cur = con.cursor()
339            type_cache = con.type_cache
340            self.assertIn('numeric', type_cache)
341            cur.close()
342        finally:
343            con.close()
344        con = self._connect()
345        try:
346            cur = con.cursor()
347            type_cache = con.type_cache
348            self.assertNotIn('pg_type', type_cache)
349            self.assertEqual(type_cache.get('pg_type'), type_info)
350            self.assertIn('pg_type', type_cache)
351            self.assertIsNone(type_cache.get(
352                self.table_prefix + '_surely_does_not_exist'))
353            cur.close()
354        finally:
355            con.close()
356
357    def test_type_cache_typecast(self):
358        con = self._connect()
359        try:
360            cur = con.cursor()
361            type_cache = con.type_cache
362            self.assertIs(type_cache.get_typecast('int4'), int)
363            cast_int = lambda v: 'int(%s)' % v
364            type_cache.set_typecast('int4', cast_int)
365            query = 'select 2::int2, 4::int4, 8::int8'
366            cur.execute(query)
367            i2, i4, i8 = cur.fetchone()
368            self.assertEqual(i2, 2)
369            self.assertEqual(i4, 'int(4)')
370            self.assertEqual(i8, 8)
371            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
372            type_cache.set_typecast(['int2', 'int8'], cast_int)
373            cur.execute(query)
374            i2, i4, i8 = cur.fetchone()
375            self.assertEqual(i2, 'int(2)')
376            self.assertEqual(i4, 'int(4)')
377            self.assertEqual(i8, 'int(8)')
378            type_cache.reset_typecast('int4')
379            cur.execute(query)
380            i2, i4, i8 = cur.fetchone()
381            self.assertEqual(i2, 'int(2)')
382            self.assertEqual(i4, 4)
383            self.assertEqual(i8, 'int(8)')
384            type_cache.reset_typecast(['int2', 'int8'])
385            cur.execute(query)
386            i2, i4, i8 = cur.fetchone()
387            self.assertEqual(i2, 2)
388            self.assertEqual(i4, 4)
389            self.assertEqual(i8, 8)
390            type_cache.set_typecast(['int2', 'int8'], cast_int)
391            cur.execute(query)
392            i2, i4, i8 = cur.fetchone()
393            self.assertEqual(i2, 'int(2)')
394            self.assertEqual(i4, 4)
395            self.assertEqual(i8, 'int(8)')
396            type_cache.reset_typecast()
397            cur.execute(query)
398            i2, i4, i8 = cur.fetchone()
399            self.assertEqual(i2, 2)
400            self.assertEqual(i4, 4)
401            self.assertEqual(i8, 8)
402            cur.close()
403        finally:
404            con.close()
405
406    def test_cursor_iteration(self):
407        con = self._connect()
408        cur = con.cursor()
409        cur.execute("select 1 union select 2 union select 3")
410        self.assertEqual([r[0] for r in cur], [1, 2, 3])
411
412    def test_fetch_2_rows(self):
413        Decimal = pgdb.decimal_type()
414        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
415            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
416            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
417            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
418            pgdb.Interval(15, 31, 5), 7897234)
419        table = self.table_prefix + 'booze'
420        con = self._connect()
421        try:
422            cur = con.cursor()
423            cur.execute("set datestyle to iso")
424            cur.execute("create table %s ("
425                "stringtest varchar,"
426                "binarytest bytea,"
427                "booltest bool,"
428                "integertest int4,"
429                "longtest int8,"
430                "floattest float8,"
431                "numerictest numeric,"
432                "moneytest money,"
433                "datetest date,"
434                "timetest time,"
435                "datetimetest timestamp,"
436                "intervaltest interval,"
437                "rowidtest oid)" % table)
438            cur.execute("set standard_conforming_strings to on")
439            for s in ('numeric', 'monetary', 'time'):
440                cur.execute("set lc_%s to 'C'" % s)
441            for _i in range(2):
442                cur.execute("insert into %s values ("
443                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
444                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
445            cur.execute("select * from %s" % table)
446            rows = cur.fetchall()
447            self.assertEqual(len(rows), 2)
448            row0 = rows[0]
449            self.assertEqual(row0, values)
450            self.assertEqual(row0, rows[1])
451            self.assertIsInstance(row0[0], str)
452            self.assertIsInstance(row0[1], bytes)
453            self.assertIsInstance(row0[2], bool)
454            self.assertIsInstance(row0[3], int)
455            self.assertIsInstance(row0[4], long)
456            self.assertIsInstance(row0[5], float)
457            self.assertIsInstance(row0[6], Decimal)
458            self.assertIsInstance(row0[7], Decimal)
459            self.assertIsInstance(row0[8], date)
460            self.assertIsInstance(row0[9], time)
461            self.assertIsInstance(row0[10], datetime)
462            self.assertIsInstance(row0[11], timedelta)
463        finally:
464            con.close()
465
466    def test_sqlstate(self):
467        con = self._connect()
468        cur = con.cursor()
469        try:
470            cur.execute("select 1/0")
471        except pgdb.DatabaseError as error:
472            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
473            # the SQLSTATE error code for division by zero is 22012
474            self.assertEqual(error.sqlstate, '22012')
475
476    def test_float(self):
477        nan, inf = float('nan'), float('inf')
478        from math import isnan, isinf
479        self.assertTrue(isnan(nan) and not isinf(nan))
480        self.assertTrue(isinf(inf) and not isnan(inf))
481        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
482            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
483        table = self.table_prefix + 'booze'
484        con = self._connect()
485        try:
486            cur = con.cursor()
487            cur.execute(
488                "create table %s (n smallint, floattest float)" % table)
489            params = enumerate(values)
490            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
491            cur.execute("select floattest from %s order by n" % table)
492            rows = cur.fetchall()
493            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
494            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
495            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
496        finally:
497            con.close()
498        self.assertEqual(len(rows), len(values))
499        rows = [row[0] for row in rows]
500        for inval, outval in zip(values, rows):
501            if inval in ('inf', 'Infinity'):
502                inval = inf
503            elif inval in ('-inf', '-Infinity'):
504                inval = -inf
505            elif inval in ('nan', 'NaN'):
506                inval = nan
507            if isinf(inval):
508                self.assertTrue(isinf(outval))
509                if inval < 0:
510                    self.assertTrue(outval < 0)
511                else:
512                    self.assertTrue(outval > 0)
513            elif isnan(inval):
514                self.assertTrue(isnan(outval))
515            else:
516                self.assertEqual(inval, outval)
517
518    def test_datetime(self):
519        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
520        table = self.table_prefix + 'booze'
521        con = self._connect()
522        try:
523            cur = con.cursor()
524            cur.execute("create table %s ("
525                "d date, t time,  ts timestamp,"
526                "tz timetz, tsz timestamptz)" % table)
527            for n in range(3):
528                values = [dt.date(), dt.time(), dt,
529                    dt.time(), dt]
530                if timezone:
531                    values[3] = values[3].replace(tzinfo=timezone.utc)
532                    values[4] = values[4].replace(tzinfo=timezone.utc)
533                if n == 0:  # input as objects
534                    params = values
535                if n == 1:  # input as text
536                    params = [v.isoformat() for v in values]  # as text
537                elif n == 2:  # input using type helpers
538                    d = (dt.year, dt.month, dt.day)
539                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
540                    params = [pgdb.Date(*d), pgdb.Time(*t),
541                            pgdb.Timestamp(*(d + t)), pgdb.Time(*t),
542                            pgdb.Timestamp(*(d + t))]
543                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
544                        'sql, mdy', 'sql, dmy', 'german'):
545                    cur.execute("set datestyle to %s" % datestyle)
546                    cur.execute("insert into %s"
547                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
548                    cur.execute("select * from %s" % table)
549                    d = cur.description
550                    for i in range(5):
551                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
552                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
553                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
554                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
555                    self.assertEqual(d[0].type_code, pgdb.DATE)
556                    self.assertEqual(d[1].type_code, pgdb.TIME)
557                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
558                    self.assertEqual(d[3].type_code, pgdb.TIME)
559                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
560                    row = cur.fetchone()
561                    self.assertEqual(row, tuple(values))
562                    cur.execute("delete from %s" % table)
563        finally:
564            con.close()
565
566    def test_interval(self):
567        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
568        table = self.table_prefix + 'booze'
569        con = self._connect()
570        try:
571            cur = con.cursor()
572            cur.execute("create table %s (i interval)" % table)
573            for n in range(3):
574                if n == 0:  # input as objects
575                    param = td
576                if n == 1:  # input as text
577                    param = '%d days %d seconds %d microseconds ' % (
578                        td.days, td.seconds, td.microseconds)
579                elif n == 2:  # input using type helpers
580                    param = pgdb.Interval(
581                        td.days, 0, 0, td.seconds, td.microseconds)
582                for intervalstyle in ('sql_standard ', 'postgres',
583                        'postgres_verbose', 'iso_8601'):
584                    cur.execute("set intervalstyle to %s" % intervalstyle)
585                    cur.execute("insert into %s"
586                        " values (%%s)" % table, [param])
587                    cur.execute("select * from %s" % table)
588                    tc = cur.description[0].type_code
589                    self.assertEqual(tc, pgdb.DATETIME)
590                    self.assertNotEqual(tc, pgdb.STRING)
591                    self.assertNotEqual(tc, pgdb.ARRAY)
592                    self.assertNotEqual(tc, pgdb.RECORD)
593                    self.assertEqual(tc, pgdb.INTERVAL)
594                    row = cur.fetchone()
595                    self.assertEqual(row, (td,))
596                    cur.execute("delete from %s" % table)
597        finally:
598            con.close()
599
600    def test_hstore(self):
601        con = self._connect()
602        try:
603            cur = con.cursor()
604            cur.execute("select 'k=>v'::hstore")
605        except pgdb.ProgrammingError:
606            try:
607                cur.execute("create extension hstore")
608            except pgdb.ProgrammingError:
609                self.skipTest("hstore extension not enabled")
610        finally:
611            con.close()
612        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever',
613            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
614            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
615            'None': None, 'NULL': 'NULL', 'empty': ''}
616        con = self._connect()
617        try:
618            cur = con.cursor()
619            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
620            result = cur.fetchone()[0]
621        finally:
622            con.close()
623        self.assertIsInstance(result, dict)
624        self.assertEqual(result, d)
625
626    def test_uuid(self):
627        d = UUID('{12345678-1234-5678-1234-567812345678}')
628        con = self._connect()
629        try:
630            cur = con.cursor()
631            cur.execute("select %s::uuid", (d,))
632            result = cur.fetchone()[0]
633        finally:
634            con.close()
635        self.assertIsInstance(result, UUID)
636        self.assertEqual(result, d)
637
638    def test_insert_array(self):
639        values = [(None, None), ([], []), ([None], [[None], ['null']]),
640            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
641            ([20000, 25000, 25000, 30000],
642            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
643            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
644        table = self.table_prefix + 'booze'
645        con = self._connect()
646        try:
647            cur = con.cursor()
648            cur.execute("create table %s"
649                " (n smallint, i int[], t text[][])" % table)
650            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
651            # Note that we must explicit casts because we are inserting
652            # empty arrays.  Otherwise this is not necessary.
653            cur.executemany("insert into %s values"
654                " (%%d,%%s::int[],%%s::text[][])" % table, params)
655            cur.execute("select i, t from %s order by n" % table)
656            d = cur.description
657            self.assertEqual(d[0].type_code, pgdb.ARRAY)
658            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
659            self.assertEqual(d[0].type_code, pgdb.NUMBER)
660            self.assertEqual(d[0].type_code, pgdb.INTEGER)
661            self.assertEqual(d[1].type_code, pgdb.ARRAY)
662            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
663            self.assertEqual(d[1].type_code, pgdb.STRING)
664            rows = cur.fetchall()
665        finally:
666            con.close()
667        self.assertEqual(rows, values)
668
669    def test_select_array(self):
670        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
671        con = self._connect()
672        try:
673            cur = con.cursor()
674            cur.execute("select %s::int[], %s::text[]", values)
675            row = cur.fetchone()
676        finally:
677            con.close()
678        self.assertEqual(row, values)
679
680    def test_insert_record(self):
681        values = [('John', 61), ('Jane', 63),
682                  ('Fred', None), ('Wilma', None),
683                  (None, 42), (None, None)]
684        table = self.table_prefix + 'booze'
685        record = self.table_prefix + 'munch'
686        con = self._connect()
687        try:
688            cur = con.cursor()
689            cur.execute("create type %s as (name varchar, age int)" % record)
690            cur.execute("create table %s (n smallint, r %s)" % (table, record))
691            params = enumerate(values)
692            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
693            cur.execute("select r from %s order by n" % table)
694            type_code = cur.description[0].type_code
695            self.assertEqual(type_code, record)
696            self.assertEqual(type_code, pgdb.RECORD)
697            self.assertNotEqual(type_code, pgdb.ARRAY)
698            columns = con.type_cache.get_fields(type_code)
699            self.assertEqual(columns[0].name, 'name')
700            self.assertEqual(columns[1].name, 'age')
701            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
702            self.assertEqual(con.type_cache[columns[1].type], 'int4')
703            rows = cur.fetchall()
704        finally:
705            cur.execute('drop table %s' % table)
706            cur.execute('drop type %s' % record)
707            con.close()
708        self.assertEqual(len(rows), len(values))
709        rows = [row[0] for row in rows]
710        self.assertEqual(rows, values)
711        self.assertEqual(rows[0].name, 'John')
712        self.assertEqual(rows[0].age, 61)
713
714    def test_select_record(self):
715        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
716            '(test)', '(x,y)', ' x y ', 'null', None)
717        con = self._connect()
718        try:
719            cur = con.cursor()
720            cur.execute("select %s as test_record", [value])
721            self.assertEqual(cur.description[0].name, 'test_record')
722            self.assertEqual(cur.description[0].type_code, 'record')
723            row = cur.fetchone()[0]
724        finally:
725            con.close()
726        # Note that the element types get lost since we created an
727        # untyped record (an anonymous composite type). For the same
728        # reason this is also a normal tuple, not a named tuple.
729        text_row = tuple(None if v is None else str(v) for v in value)
730        self.assertEqual(row, text_row)
731
732    def test_custom_type(self):
733        values = [3, 5, 65]
734        values = list(map(PgBitString, values))
735        table = self.table_prefix + 'booze'
736        con = self._connect()
737        try:
738            cur = con.cursor()
739            params = enumerate(values)  # params have __pg_repr__ method
740            cur.execute(
741                'create table "%s" (n smallint, b bit varying(7))' % table)
742            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
743            cur.execute("select * from %s" % table)
744            rows = cur.fetchall()
745        finally:
746            con.close()
747        self.assertEqual(len(rows), len(values))
748        con = self._connect()
749        try:
750            cur = con.cursor()
751            params = (1, object())  # an object that cannot be handled
752            self.assertRaises(pgdb.InterfaceError, cur.execute,
753                "insert into %s values (%%s,%%s)" % table, params)
754        finally:
755            con.close()
756
757    def test_set_decimal_type(self):
758        decimal_type = pgdb.decimal_type()
759        self.assertTrue(decimal_type is not None and callable(decimal_type))
760        con = self._connect()
761        try:
762            cur = con.cursor()
763            # change decimal type globally to int
764            int_type = lambda v: int(float(v))
765            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
766            cur.execute('select 4.25')
767            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
768            value = cur.fetchone()[0]
769            self.assertTrue(isinstance(value, int))
770            self.assertEqual(value, 4)
771            # change decimal type again to float
772            self.assertTrue(pgdb.decimal_type(float) is float)
773            cur.execute('select 4.25')
774            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
775            value = cur.fetchone()[0]
776            # the connection still uses the old setting
777            self.assertTrue(isinstance(value, int))
778            # bust the cache for type functions for the connection
779            con.type_cache.reset_typecast()
780            cur.execute('select 4.25')
781            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
782            value = cur.fetchone()[0]
783            # now the connection uses the new setting
784            self.assertTrue(isinstance(value, float))
785            self.assertEqual(value, 4.25)
786        finally:
787            con.close()
788            pgdb.decimal_type(decimal_type)
789        self.assertTrue(pgdb.decimal_type() is decimal_type)
790
791    def test_global_typecast(self):
792        try:
793            query = 'select 2::int2, 4::int4, 8::int8'
794            self.assertIs(pgdb.get_typecast('int4'), int)
795            cast_int = lambda v: 'int(%s)' % v
796            pgdb.set_typecast('int4', cast_int)
797            con = self._connect()
798            try:
799                i2, i4, i8 = con.cursor().execute(query).fetchone()
800            finally:
801                con.close()
802            self.assertEqual(i2, 2)
803            self.assertEqual(i4, 'int(4)')
804            self.assertEqual(i8, 8)
805            pgdb.set_typecast(['int2', 'int8'], cast_int)
806            con = self._connect()
807            try:
808                i2, i4, i8 = con.cursor().execute(query).fetchone()
809            finally:
810                con.close()
811            self.assertEqual(i2, 'int(2)')
812            self.assertEqual(i4, 'int(4)')
813            self.assertEqual(i8, 'int(8)')
814            pgdb.reset_typecast('int4')
815            con = self._connect()
816            try:
817                i2, i4, i8 = con.cursor().execute(query).fetchone()
818            finally:
819                con.close()
820            self.assertEqual(i2, 'int(2)')
821            self.assertEqual(i4, 4)
822            self.assertEqual(i8, 'int(8)')
823            pgdb.reset_typecast(['int2', 'int8'])
824            con = self._connect()
825            try:
826                i2, i4, i8 = con.cursor().execute(query).fetchone()
827            finally:
828                con.close()
829            self.assertEqual(i2, 2)
830            self.assertEqual(i4, 4)
831            self.assertEqual(i8, 8)
832            pgdb.set_typecast(['int2', 'int8'], cast_int)
833            con = self._connect()
834            try:
835                i2, i4, i8 = con.cursor().execute(query).fetchone()
836            finally:
837                con.close()
838            self.assertEqual(i2, 'int(2)')
839            self.assertEqual(i4, 4)
840            self.assertEqual(i8, 'int(8)')
841        finally:
842            pgdb.reset_typecast()
843        con = self._connect()
844        try:
845            i2, i4, i8 = con.cursor().execute(query).fetchone()
846        finally:
847            con.close()
848        self.assertEqual(i2, 2)
849        self.assertEqual(i4, 4)
850        self.assertEqual(i8, 8)
851
852    def test_unicode_with_utf8(self):
853        table = self.table_prefix + 'booze'
854        input = u"He wes Leovenaðes sone — liðe him be Drihten"
855        con = self._connect()
856        try:
857            cur = con.cursor()
858            cur.execute("create table %s (t text)" % table)
859            try:
860                cur.execute("set client_encoding=utf8")
861                cur.execute(u"select '%s'" % input)
862            except Exception:
863                self.skipTest("database does not support utf8")
864            output1 = cur.fetchone()[0]
865            cur.execute("insert into %s values (%%s)" % table, (input,))
866            cur.execute("select * from %s" % table)
867            output2 = cur.fetchone()[0]
868            cur.execute("select t = '%s' from %s" % (input, table))
869            output3 = cur.fetchone()[0]
870            cur.execute("select t = %%s from %s" % table, (input,))
871            output4 = cur.fetchone()[0]
872        finally:
873            con.close()
874        if str is bytes:  # Python < 3.0
875            input = input.encode('utf8')
876        self.assertIsInstance(output1, str)
877        self.assertEqual(output1, input)
878        self.assertIsInstance(output2, str)
879        self.assertEqual(output2, input)
880        self.assertIsInstance(output3, bool)
881        self.assertTrue(output3)
882        self.assertIsInstance(output4, bool)
883        self.assertTrue(output4)
884
885    def test_unicode_with_latin1(self):
886        table = self.table_prefix + 'booze'
887        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
888        con = self._connect()
889        try:
890            cur = con.cursor()
891            cur.execute("create table %s (t text)" % table)
892            try:
893                cur.execute("set client_encoding=latin1")
894                cur.execute(u"select '%s'" % input)
895            except Exception:
896                self.skipTest("database does not support latin1")
897            output1 = cur.fetchone()[0]
898            cur.execute("insert into %s values (%%s)" % table, (input,))
899            cur.execute("select * from %s" % table)
900            output2 = cur.fetchone()[0]
901            cur.execute("select t = '%s' from %s" % (input, table))
902            output3 = cur.fetchone()[0]
903            cur.execute("select t = %%s from %s" % table, (input,))
904            output4 = cur.fetchone()[0]
905        finally:
906            con.close()
907        if str is bytes:  # Python < 3.0
908            input = input.encode('latin1')
909        self.assertIsInstance(output1, str)
910        self.assertEqual(output1, input)
911        self.assertIsInstance(output2, str)
912        self.assertEqual(output2, input)
913        self.assertIsInstance(output3, bool)
914        self.assertTrue(output3)
915        self.assertIsInstance(output4, bool)
916        self.assertTrue(output4)
917
918    def test_bool(self):
919        values = [False, True, None, 't', 'f', 'true', 'false']
920        table = self.table_prefix + 'booze'
921        con = self._connect()
922        try:
923            cur = con.cursor()
924            cur.execute(
925                "create table %s (n smallint, booltest bool)" % table)
926            params = enumerate(values)
927            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
928            cur.execute("select booltest from %s order by n" % table)
929            rows = cur.fetchall()
930            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
931        finally:
932            con.close()
933        rows = [row[0] for row in rows]
934        values[3] = values[5] = True
935        values[4] = values[6] = False
936        self.assertEqual(rows, values)
937
938    def test_literal(self):
939        con = self._connect()
940        try:
941            cur = con.cursor()
942            value = "lower('Hello')"
943            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
944            row = cur.fetchone()
945        finally:
946            con.close()
947        self.assertEqual(row, (value, 'hello'))
948
949
950    def test_json(self):
951        inval = {"employees":
952            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
953        table = self.table_prefix + 'booze'
954        con = self._connect()
955        try:
956            cur = con.cursor()
957            try:
958                cur.execute("create table %s (jsontest json)" % table)
959            except pgdb.ProgrammingError:
960                self.skipTest('database does not support json')
961            params = (pgdb.Json(inval),)
962            cur.execute("insert into %s values (%%s)" % table, params)
963            cur.execute("select jsontest from %s" % table)
964            outval = cur.fetchone()[0]
965            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
966        finally:
967            con.close()
968        self.assertEqual(inval, outval)
969
970    def test_jsonb(self):
971        inval = {"employees":
972            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
973        table = self.table_prefix + 'booze'
974        con = self._connect()
975        try:
976            cur = con.cursor()
977            try:
978                cur.execute("create table %s (jsonbtest jsonb)" % table)
979            except pgdb.ProgrammingError:
980                self.skipTest('database does not support jsonb')
981            params = (pgdb.Json(inval),)
982            cur.execute("insert into %s values (%%s)" % table, params)
983            cur.execute("select jsonbtest from %s" % table)
984            outval = cur.fetchone()[0]
985            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
986        finally:
987            con.close()
988        self.assertEqual(inval, outval)
989
990    def test_execute_edge_cases(self):
991        con = self._connect()
992        try:
993            cur = con.cursor()
994            sql = 'invalid'  # should be ignored with empty parameter list
995            cur.executemany(sql, [])
996            sql = 'select %d + 1'
997            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
998            self.assertEqual(cur.fetchone()[0], 3)
999            sql = 'select 1/0'  # cannot be executed
1000            self.assertRaises(pgdb.ProgrammingError, cur.execute, sql)
1001            cur.close()
1002            con.rollback()
1003            if pgdb.shortcutmethods:
1004                res = con.execute('select %d', (1,)).fetchone()
1005                self.assertEqual(res, (1,))
1006                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1007                self.assertEqual(res, (2,))
1008        finally:
1009            con.close()
1010        sql = 'select 1'  # cannot be executed after connection is closed
1011        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1012
1013    def test_fetchmany_with_keep(self):
1014        con = self._connect()
1015        try:
1016            cur = con.cursor()
1017            self.assertEqual(cur.arraysize, 1)
1018            cur.execute('select * from generate_series(1, 25)')
1019            self.assertEqual(len(cur.fetchmany()), 1)
1020            self.assertEqual(len(cur.fetchmany()), 1)
1021            self.assertEqual(cur.arraysize, 1)
1022            cur.arraysize = 3
1023            self.assertEqual(len(cur.fetchmany()), 3)
1024            self.assertEqual(len(cur.fetchmany()), 3)
1025            self.assertEqual(cur.arraysize, 3)
1026            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1027            self.assertEqual(cur.arraysize, 3)
1028            self.assertEqual(len(cur.fetchmany()), 3)
1029            self.assertEqual(len(cur.fetchmany()), 3)
1030            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1031            self.assertEqual(cur.arraysize, 2)
1032            self.assertEqual(len(cur.fetchmany()), 2)
1033            self.assertEqual(len(cur.fetchmany()), 2)
1034            self.assertEqual(len(cur.fetchmany(25)), 3)
1035        finally:
1036            con.close()
1037
1038    def test_nextset(self):
1039        con = self._connect()
1040        cur = con.cursor()
1041        self.assertRaises(con.NotSupportedError, cur.nextset)
1042
1043    def test_setoutputsize(self):
1044        pass  # not supported
1045
1046    def test_connection_errors(self):
1047        con = self._connect()
1048        self.assertEqual(con.Error, pgdb.Error)
1049        self.assertEqual(con.Warning, pgdb.Warning)
1050        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1051        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1052        self.assertEqual(con.InternalError, pgdb.InternalError)
1053        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1054        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1055        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1056        self.assertEqual(con.DataError, pgdb.DataError)
1057        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1058
1059    def test_connection_as_contextmanager(self):
1060        table = self.table_prefix + 'booze'
1061        con = self._connect()
1062        try:
1063            cur = con.cursor()
1064            cur.execute("create table %s (n smallint check(n!=4))" % table)
1065            with con:
1066                cur.execute("insert into %s values (1)" % table)
1067                cur.execute("insert into %s values (2)" % table)
1068            try:
1069                with con:
1070                    cur.execute("insert into %s values (3)" % table)
1071                    cur.execute("insert into %s values (4)" % table)
1072            except con.ProgrammingError as error:
1073                self.assertTrue('check' in str(error).lower())
1074            with con:
1075                cur.execute("insert into %s values (5)" % table)
1076                cur.execute("insert into %s values (6)" % table)
1077            try:
1078                with con:
1079                    cur.execute("insert into %s values (7)" % table)
1080                    cur.execute("insert into %s values (8)" % table)
1081                    raise ValueError('transaction should rollback')
1082            except ValueError as error:
1083                self.assertEqual(str(error), 'transaction should rollback')
1084            with con:
1085                cur.execute("insert into %s values (9)" % table)
1086            cur.execute("select * from %s order by 1" % table)
1087            rows = cur.fetchall()
1088            rows = [row[0] for row in rows]
1089        finally:
1090            con.close()
1091        self.assertEqual(rows, [1, 2, 5, 6, 9])
1092
1093    def test_cursor_connection(self):
1094        con = self._connect()
1095        cur = con.cursor()
1096        self.assertEqual(cur.connection, con)
1097        cur.close()
1098
1099    def test_cursor_as_contextmanager(self):
1100        con = self._connect()
1101        with con.cursor() as cur:
1102            self.assertEqual(cur.connection, con)
1103
1104    def test_pgdb_type(self):
1105        self.assertEqual(pgdb.STRING, pgdb.STRING)
1106        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1107        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1108        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1109        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1110        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1111        self.assertEqual('char', pgdb.STRING)
1112        self.assertEqual('varchar', pgdb.STRING)
1113        self.assertEqual('text', pgdb.STRING)
1114        self.assertNotEqual('numeric', pgdb.STRING)
1115        self.assertEqual('numeric', pgdb.NUMERIC)
1116        self.assertEqual('numeric', pgdb.NUMBER)
1117        self.assertEqual('int4', pgdb.NUMBER)
1118        self.assertNotEqual('int4', pgdb.NUMERIC)
1119        self.assertEqual('int2', pgdb.SMALLINT)
1120        self.assertNotEqual('int4', pgdb.SMALLINT)
1121        self.assertEqual('int2', pgdb.INTEGER)
1122        self.assertEqual('int4', pgdb.INTEGER)
1123        self.assertEqual('int8', pgdb.INTEGER)
1124        self.assertNotEqual('int4', pgdb.LONG)
1125        self.assertEqual('int8', pgdb.LONG)
1126        self.assertTrue('char' in pgdb.STRING)
1127        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1128        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1129        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1130        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1131        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1132        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1133        self.assertEqual('_char', pgdb.ARRAY)
1134        self.assertNotEqual('char', pgdb.ARRAY)
1135        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1136        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1137        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1138        self.assertEqual('record', pgdb.RECORD)
1139        self.assertNotEqual('_record', pgdb.RECORD)
1140
1141
1142if __name__ == '__main__':
1143    unittest.main()
Note: See TracBrowser for help on using the repository browser.