source: trunk/tests/test_dbapi20.py @ 821

Last change on this file since 821 was 821, checked in by cito, 4 years ago

Support the uuid data type

This is often useful and also supported by SQLAlchemy

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 44.6 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 821 2016-02-05 16:13:54Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import date, time, datetime, timedelta
32from uuid import UUID
33
34try:
35    from datetime import timezone
36except ImportError:  # Python < 3.2
37    timezone = None
38
39try:
40    long
41except NameError:  # Python >= 3.0
42    long = int
43
44try:
45    from collections import OrderedDict
46except ImportError:  # Python 2.6 or 3.0
47    OrderedDict = None
48
49
50class PgBitString:
51    """Test object with a PostgreSQL representation as Bit String."""
52
53    def __init__(self, value):
54        self.value = value
55
56    def __pg_repr__(self):
57         return "B'{0:b}'".format(self.value)
58
59
60class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
61
62    driver = pgdb
63    connect_args = ()
64    connect_kw_args = {'database': dbname,
65        'host': '%s:%d' % (dbhost or '', dbport or -1)}
66
67    lower_func = 'lower'  # For stored procedure test
68
69    def setUp(self):
70        # Call superclass setUp in case this does something in the future
71        dbapi20.DatabaseAPI20Test.setUp(self)
72        try:
73            con = self._connect()
74            con.close()
75        except pgdb.Error:  # try to create a missing database
76            import pg
77            try:  # first try to log in as superuser
78                db = pg.DB('postgres', dbhost or None, dbport or -1,
79                    user='postgres')
80            except Exception:  # then try to log in as current user
81                db = pg.DB('postgres', dbhost or None, dbport or -1)
82            db.query('create database ' + dbname)
83
84    def tearDown(self):
85        dbapi20.DatabaseAPI20Test.tearDown(self)
86
87    def test_callproc_no_params(self):
88        con = self._connect()
89        cur = con.cursor()
90        # note that now() does not change within a transaction
91        cur.execute('select now()')
92        now = cur.fetchone()[0]
93        res = cur.callproc('now')
94        self.assertIsNone(res)
95        res = cur.fetchone()[0]
96        self.assertEqual(res, now)
97
98    def test_callproc_bad_params(self):
99        con = self._connect()
100        cur = con.cursor()
101        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
102        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
103
104    def test_callproc_one_param(self):
105        con = self._connect()
106        cur = con.cursor()
107        params = (42.4382,)
108        res = cur.callproc("round", params)
109        self.assertIs(res, params)
110        res = cur.fetchone()[0]
111        self.assertEqual(res, 42)
112
113    def test_callproc_two_params(self):
114        con = self._connect()
115        cur = con.cursor()
116        params = (9, 4)
117        res = cur.callproc("div", params)
118        self.assertIs(res, params)
119        res = cur.fetchone()[0]
120        self.assertEqual(res, 2)
121
122    def test_cursor_type(self):
123
124        class TestCursor(pgdb.Cursor):
125            pass
126
127        con = self._connect()
128        self.assertIs(con.cursor_type, pgdb.Cursor)
129        cur = con.cursor()
130        self.assertIsInstance(cur, pgdb.Cursor)
131        self.assertNotIsInstance(cur, TestCursor)
132        con.cursor_type = TestCursor
133        cur = con.cursor()
134        self.assertIsInstance(cur, TestCursor)
135        cur = con.cursor()
136        self.assertIsInstance(cur, TestCursor)
137        con = self._connect()
138        self.assertIs(con.cursor_type, pgdb.Cursor)
139        cur = con.cursor()
140        self.assertIsInstance(cur, pgdb.Cursor)
141        self.assertNotIsInstance(cur, TestCursor)
142
143    def test_row_factory(self):
144
145        class TestCursor(pgdb.Cursor):
146
147            def row_factory(self, row):
148                return dict(('column %s' % desc[0], value)
149                    for desc, value in zip(self.description, row))
150
151        con = self._connect()
152        con.cursor_type = TestCursor
153        cur = con.cursor()
154        self.assertIsInstance(cur, TestCursor)
155        res = cur.execute("select 1 as a, 2 as b")
156        self.assertIs(res, cur, 'execute() should return cursor')
157        res = cur.fetchone()
158        self.assertIsInstance(res, dict)
159        self.assertEqual(res, {'column a': 1, 'column b': 2})
160        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
161        res = cur.fetchall()
162        self.assertIsInstance(res, list)
163        self.assertEqual(len(res), 2)
164        self.assertIsInstance(res[0], dict)
165        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
166        self.assertIsInstance(res[1], dict)
167        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
168
169    def test_build_row_factory(self):
170
171        class TestCursor(pgdb.Cursor):
172
173            def build_row_factory(self):
174                keys = [desc[0] for desc in self.description]
175                return lambda row: dict((key, value)
176                    for key, value in zip(keys, row))
177
178        con = self._connect()
179        con.cursor_type = TestCursor
180        cur = con.cursor()
181        self.assertIsInstance(cur, TestCursor)
182        cur.execute("select 1 as a, 2 as b")
183        res = cur.fetchone()
184        self.assertIsInstance(res, dict)
185        self.assertEqual(res, {'a': 1, 'b': 2})
186        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
187        res = cur.fetchall()
188        self.assertIsInstance(res, list)
189        self.assertEqual(len(res), 2)
190        self.assertIsInstance(res[0], dict)
191        self.assertEqual(res[0], {'a': 1, 'b': 2})
192        self.assertIsInstance(res[1], dict)
193        self.assertEqual(res[1], {'a': 3, 'b': 4})
194
195    def test_cursor_with_named_columns(self):
196        con = self._connect()
197        cur = con.cursor()
198        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
199        self.assertIs(res, cur, 'execute() should return cursor')
200        res = cur.fetchone()
201        self.assertIsInstance(res, tuple)
202        self.assertEqual(res, (1, 2, 3))
203        self.assertEqual(res._fields, ('abc', 'de', 'f'))
204        self.assertEqual(res.abc, 1)
205        self.assertEqual(res.de, 2)
206        self.assertEqual(res.f, 3)
207        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
208        res = cur.fetchall()
209        self.assertIsInstance(res, list)
210        self.assertEqual(len(res), 2)
211        self.assertIsInstance(res[0], tuple)
212        self.assertEqual(res[0], (1, 2))
213        self.assertEqual(res[0]._fields, ('one', 'two'))
214        self.assertIsInstance(res[1], tuple)
215        self.assertEqual(res[1], (3, 4))
216        self.assertEqual(res[1]._fields, ('one', 'two'))
217
218    def test_cursor_with_unnamed_columns(self):
219        con = self._connect()
220        cur = con.cursor()
221        cur.execute("select 1, 2, 3")
222        res = cur.fetchone()
223        self.assertIsInstance(res, tuple)
224        self.assertEqual(res, (1, 2, 3))
225        old_py = OrderedDict is None  # Python 2.6 or 3.0
226        # old Python versions cannot rename tuple fields with underscore
227        if old_py:
228            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
229        else:
230            self.assertEqual(res._fields, ('_0', '_1', '_2'))
231        cur.execute("select 1 as one, 2, 3 as three")
232        res = cur.fetchone()
233        self.assertIsInstance(res, tuple)
234        self.assertEqual(res, (1, 2, 3))
235        if old_py:  # cannot auto rename with underscore
236            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
237        else:
238            self.assertEqual(res._fields, ('one', '_1', 'three'))
239        cur.execute("select 1 as abc, 2 as def")
240        res = cur.fetchone()
241        self.assertIsInstance(res, tuple)
242        self.assertEqual(res, (1, 2))
243        if old_py:
244            self.assertEqual(res._fields, ('column_0', 'column_1'))
245        else:
246            self.assertEqual(res._fields, ('abc', '_1'))
247
248    def test_colnames(self):
249        con = self._connect()
250        cur = con.cursor()
251        cur.execute("select 1, 2, 3")
252        names = cur.colnames
253        self.assertIsInstance(names, list)
254        self.assertEqual(names, ['?column?', '?column?', '?column?'])
255        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
256        names = cur.colnames
257        self.assertIsInstance(names, list)
258        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
259
260    def test_coltypes(self):
261        con = self._connect()
262        cur = con.cursor()
263        cur.execute("select 1::int2, 2::int4, 3::int8")
264        types = cur.coltypes
265        self.assertIsInstance(types, list)
266        self.assertEqual(types, ['int2', 'int4', 'int8'])
267
268    def test_description_fields(self):
269        con = self._connect()
270        cur = con.cursor()
271        cur.execute("select 123456789::int8 col0,"
272            " 123456.789::numeric(41, 13) as col1,"
273            " 'foobar'::char(39) as col2")
274        desc = cur.description
275        self.assertIsInstance(desc, list)
276        self.assertEqual(len(desc), 3)
277        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
278        for i in range(3):
279            c, d = cols[i], desc[i]
280            self.assertIsInstance(d, tuple)
281            self.assertEqual(len(d), 7)
282            self.assertIsInstance(d.name, str)
283            self.assertEqual(d.name, 'col%d' % i)
284            self.assertIsInstance(d.type_code, str)
285            self.assertEqual(d.type_code, c[0])
286            self.assertIsNone(d.display_size)
287            self.assertIsInstance(d.internal_size, int)
288            self.assertEqual(d.internal_size, c[1])
289            if c[2] is not None:
290                self.assertIsInstance(d.precision, int)
291                self.assertEqual(d.precision, c[1])
292                self.assertIsInstance(d.scale, int)
293                self.assertEqual(d.scale, c[2])
294            else:
295                self.assertIsNone(d.precision)
296                self.assertIsNone(d.scale)
297            self.assertIsNone(d.null_ok)
298
299    def test_type_cache_info(self):
300        con = self._connect()
301        try:
302            cur = con.cursor()
303            type_cache = con.type_cache
304            self.assertNotIn('numeric', type_cache)
305            type_info = type_cache['numeric']
306            self.assertIn('numeric', type_cache)
307            self.assertEqual(type_info, 'numeric')
308            self.assertEqual(type_info.oid, 1700)
309            self.assertEqual(type_info.len, -1)
310            self.assertEqual(type_info.type, 'b')  # base
311            self.assertEqual(type_info.category, 'N')  # numeric
312            self.assertEqual(type_info.delim, ',')
313            self.assertEqual(type_info.relid, 0)
314            self.assertIs(con.type_cache[1700], type_info)
315            self.assertNotIn('pg_type', type_cache)
316            type_info = type_cache['pg_type']
317            self.assertIn('pg_type', type_cache)
318            self.assertEqual(type_info.type, 'c')  # composite
319            self.assertEqual(type_info.category, 'C')  # composite
320            cols = type_cache.get_fields('pg_type')
321            self.assertEqual(cols[0].name, 'typname')
322            typname = type_cache[cols[0].type]
323            self.assertEqual(typname, 'name')
324            self.assertEqual(typname.type, 'b')  # base
325            self.assertEqual(typname.category, 'S')  # string
326            self.assertEqual(cols[3].name, 'typlen')
327            typlen = type_cache[cols[3].type]
328            self.assertEqual(typlen, 'int2')
329            self.assertEqual(typlen.type, 'b')  # base
330            self.assertEqual(typlen.category, 'N')  # numeric
331            cur.close()
332            cur = con.cursor()
333            type_cache = con.type_cache
334            self.assertIn('numeric', type_cache)
335            cur.close()
336        finally:
337            con.close()
338        con = self._connect()
339        try:
340            cur = con.cursor()
341            type_cache = con.type_cache
342            self.assertNotIn('pg_type', type_cache)
343            self.assertEqual(type_cache.get('pg_type'), type_info)
344            self.assertIn('pg_type', type_cache)
345            self.assertIsNone(type_cache.get(
346                self.table_prefix + '_surely_does_not_exist'))
347            cur.close()
348        finally:
349            con.close()
350
351    def test_type_cache_typecast(self):
352        con = self._connect()
353        try:
354            cur = con.cursor()
355            type_cache = con.type_cache
356            self.assertIs(type_cache.get_typecast('int4'), int)
357            cast_int = lambda v: 'int(%s)' % v
358            type_cache.set_typecast('int4', cast_int)
359            query = 'select 2::int2, 4::int4, 8::int8'
360            cur.execute(query)
361            i2, i4, i8 = cur.fetchone()
362            self.assertEqual(i2, 2)
363            self.assertEqual(i4, 'int(4)')
364            self.assertEqual(i8, 8)
365            self.assertEqual(type_cache.typecast(42, 'int4'), 'int(42)')
366            type_cache.set_typecast(['int2', 'int8'], cast_int)
367            cur.execute(query)
368            i2, i4, i8 = cur.fetchone()
369            self.assertEqual(i2, 'int(2)')
370            self.assertEqual(i4, 'int(4)')
371            self.assertEqual(i8, 'int(8)')
372            type_cache.reset_typecast('int4')
373            cur.execute(query)
374            i2, i4, i8 = cur.fetchone()
375            self.assertEqual(i2, 'int(2)')
376            self.assertEqual(i4, 4)
377            self.assertEqual(i8, 'int(8)')
378            type_cache.reset_typecast(['int2', 'int8'])
379            cur.execute(query)
380            i2, i4, i8 = cur.fetchone()
381            self.assertEqual(i2, 2)
382            self.assertEqual(i4, 4)
383            self.assertEqual(i8, 8)
384            type_cache.set_typecast(['int2', 'int8'], cast_int)
385            cur.execute(query)
386            i2, i4, i8 = cur.fetchone()
387            self.assertEqual(i2, 'int(2)')
388            self.assertEqual(i4, 4)
389            self.assertEqual(i8, 'int(8)')
390            type_cache.reset_typecast()
391            cur.execute(query)
392            i2, i4, i8 = cur.fetchone()
393            self.assertEqual(i2, 2)
394            self.assertEqual(i4, 4)
395            self.assertEqual(i8, 8)
396            cur.close()
397        finally:
398            con.close()
399
400    def test_cursor_iteration(self):
401        con = self._connect()
402        cur = con.cursor()
403        cur.execute("select 1 union select 2 union select 3")
404        self.assertEqual([r[0] for r in cur], [1, 2, 3])
405
406    def test_fetch_2_rows(self):
407        Decimal = pgdb.decimal_type()
408        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
409            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
410            pgdb.Date(2011, 7, 17), pgdb.Time(15, 47, 42),
411            pgdb.Timestamp(2008, 10, 20, 15, 25, 35),
412            pgdb.Interval(15, 31, 5), 7897234)
413        table = self.table_prefix + 'booze'
414        con = self._connect()
415        try:
416            cur = con.cursor()
417            cur.execute("set datestyle to iso")
418            cur.execute("create table %s ("
419                "stringtest varchar,"
420                "binarytest bytea,"
421                "booltest bool,"
422                "integertest int4,"
423                "longtest int8,"
424                "floattest float8,"
425                "numerictest numeric,"
426                "moneytest money,"
427                "datetest date,"
428                "timetest time,"
429                "datetimetest timestamp,"
430                "intervaltest interval,"
431                "rowidtest oid)" % table)
432            cur.execute("set standard_conforming_strings to on")
433            for s in ('numeric', 'monetary', 'time'):
434                cur.execute("set lc_%s to 'C'" % s)
435            for _i in range(2):
436                cur.execute("insert into %s values ("
437                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
438                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
439            cur.execute("select * from %s" % table)
440            rows = cur.fetchall()
441            self.assertEqual(len(rows), 2)
442            row0 = rows[0]
443            self.assertEqual(row0, values)
444            self.assertEqual(row0, rows[1])
445            self.assertIsInstance(row0[0], str)
446            self.assertIsInstance(row0[1], bytes)
447            self.assertIsInstance(row0[2], bool)
448            self.assertIsInstance(row0[3], int)
449            self.assertIsInstance(row0[4], long)
450            self.assertIsInstance(row0[5], float)
451            self.assertIsInstance(row0[6], Decimal)
452            self.assertIsInstance(row0[7], Decimal)
453            self.assertIsInstance(row0[8], date)
454            self.assertIsInstance(row0[9], time)
455            self.assertIsInstance(row0[10], datetime)
456            self.assertIsInstance(row0[11], timedelta)
457        finally:
458            con.close()
459
460    def test_sqlstate(self):
461        con = self._connect()
462        cur = con.cursor()
463        try:
464            cur.execute("select 1/0")
465        except pgdb.DatabaseError as error:
466            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
467            # the SQLSTATE error code for division by zero is 22012
468            self.assertEqual(error.sqlstate, '22012')
469
470    def test_float(self):
471        nan, inf = float('nan'), float('inf')
472        from math import isnan, isinf
473        self.assertTrue(isnan(nan) and not isinf(nan))
474        self.assertTrue(isinf(inf) and not isnan(inf))
475        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
476            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
477        table = self.table_prefix + 'booze'
478        con = self._connect()
479        try:
480            cur = con.cursor()
481            cur.execute(
482                "create table %s (n smallint, floattest float)" % table)
483            params = enumerate(values)
484            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
485            cur.execute("select floattest from %s order by n" % table)
486            rows = cur.fetchall()
487            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
488            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
489            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
490        finally:
491            con.close()
492        self.assertEqual(len(rows), len(values))
493        rows = [row[0] for row in rows]
494        for inval, outval in zip(values, rows):
495            if inval in ('inf', 'Infinity'):
496                inval = inf
497            elif inval in ('-inf', '-Infinity'):
498                inval = -inf
499            elif inval in ('nan', 'NaN'):
500                inval = nan
501            if isinf(inval):
502                self.assertTrue(isinf(outval))
503                if inval < 0:
504                    self.assertTrue(outval < 0)
505                else:
506                    self.assertTrue(outval > 0)
507            elif isnan(inval):
508                self.assertTrue(isnan(outval))
509            else:
510                self.assertEqual(inval, outval)
511
512    def test_datetime(self):
513        dt = datetime(2011, 7, 17, 15, 47, 42, 317509)
514        table = self.table_prefix + 'booze'
515        con = self._connect()
516        try:
517            cur = con.cursor()
518            cur.execute("create table %s ("
519                "d date, t time,  ts timestamp,"
520                "tz timetz, tsz timestamptz)" % table)
521            for n in range(3):
522                values = [dt.date(), dt.time(), dt,
523                    dt.time(), dt]
524                if timezone:
525                    values[3] = values[3].replace(tzinfo=timezone.utc)
526                    values[4] = values[4].replace(tzinfo=timezone.utc)
527                if n == 0:  # input as objects
528                    params = values
529                if n == 1:  # input as text
530                    params = [v.isoformat() for v in values]  # as text
531                elif n == 2:  # input using type helpers
532                    d = (dt.year, dt.month, dt.day)
533                    t = (dt.hour, dt.minute, dt.second, dt.microsecond)
534                    params = [pgdb.Date(*d), pgdb.Time(*t),
535                            pgdb.Timestamp(*(d + t)), pgdb.Time(*t),
536                            pgdb.Timestamp(*(d + t))]
537                for datestyle in ('iso', 'postgres, mdy', 'postgres, dmy',
538                        'sql, mdy', 'sql, dmy', 'german'):
539                    cur.execute("set datestyle to %s" % datestyle)
540                    cur.execute("insert into %s"
541                        " values (%%s,%%s,%%s,%%s,%%s)" % table, params)
542                    cur.execute("select * from %s" % table)
543                    d = cur.description
544                    for i in range(5):
545                        self.assertEqual(d[i].type_code, pgdb.DATETIME)
546                        self.assertNotEqual(d[i].type_code, pgdb.STRING)
547                        self.assertNotEqual(d[i].type_code, pgdb.ARRAY)
548                        self.assertNotEqual(d[i].type_code, pgdb.RECORD)
549                    self.assertEqual(d[0].type_code, pgdb.DATE)
550                    self.assertEqual(d[1].type_code, pgdb.TIME)
551                    self.assertEqual(d[2].type_code, pgdb.TIMESTAMP)
552                    self.assertEqual(d[3].type_code, pgdb.TIME)
553                    self.assertEqual(d[4].type_code, pgdb.TIMESTAMP)
554                    row = cur.fetchone()
555                    self.assertEqual(row, tuple(values))
556                    cur.execute("delete from %s" % table)
557        finally:
558            con.close()
559
560    def test_interval(self):
561        td = datetime(2011, 7, 17, 15, 47, 42, 317509) - datetime(1970, 1, 1)
562        table = self.table_prefix + 'booze'
563        con = self._connect()
564        try:
565            cur = con.cursor()
566            cur.execute("create table %s (i interval)" % table)
567            for n in range(3):
568                if n == 0:  # input as objects
569                    param = td
570                if n == 1:  # input as text
571                    param = '%d days %d seconds %d microseconds ' % (
572                        td.days, td.seconds, td.microseconds)
573                elif n == 2:  # input using type helpers
574                    param = pgdb.Interval(
575                        td.days, 0, 0, td.seconds, td.microseconds)
576                for intervalstyle in ('sql_standard ', 'postgres',
577                        'postgres_verbose', 'iso_8601'):
578                    cur.execute("set intervalstyle to %s" % intervalstyle)
579                    cur.execute("insert into %s"
580                        " values (%%s)" % table, [param])
581                    cur.execute("select * from %s" % table)
582                    tc = cur.description[0].type_code
583                    self.assertEqual(tc, pgdb.DATETIME)
584                    self.assertNotEqual(tc, pgdb.STRING)
585                    self.assertNotEqual(tc, pgdb.ARRAY)
586                    self.assertNotEqual(tc, pgdb.RECORD)
587                    self.assertEqual(tc, pgdb.INTERVAL)
588                    row = cur.fetchone()
589                    self.assertEqual(row, (td,))
590                    cur.execute("delete from %s" % table)
591        finally:
592            con.close()
593
594    def test_hstore(self):
595        con = self._connect()
596        try:
597            cur = con.cursor()
598            cur.execute("select 'k=>v'::hstore")
599        except pgdb.ProgrammingError:
600            try:
601                cur.execute("create extension hstore")
602            except pgdb.ProgrammingError:
603                self.skipTest("hstore extension not enabled")
604        finally:
605            con.close()
606        d = {'k': 'v', 'foo': 'bar', 'baz': 'whatever',
607            '1a': 'anything at all', '2=b': 'value = 2', '3>c': 'value > 3',
608            '4"c': 'value " 4', "5'c": "value ' 5", 'hello, world': '"hi!"',
609            'None': None, 'NULL': 'NULL', 'empty': ''}
610        con = self._connect()
611        try:
612            cur = con.cursor()
613            cur.execute("select %s::hstore", (pgdb.Hstore(d),))
614            result = cur.fetchone()[0]
615        finally:
616            con.close()
617        self.assertIsInstance(result, dict)
618        self.assertEqual(result, d)
619
620    def test_uuid(self):
621        d = UUID('{12345678-1234-5678-1234-567812345678}')
622        con = self._connect()
623        try:
624            cur = con.cursor()
625            cur.execute("select %s::uuid", (d,))
626            result = cur.fetchone()[0]
627        finally:
628            con.close()
629        self.assertIsInstance(result, UUID)
630        self.assertEqual(result, d)
631
632    def test_insert_array(self):
633        values = [(None, None), ([], []), ([None], [[None], ['null']]),
634            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
635            ([20000, 25000, 25000, 30000],
636            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
637            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
638        table = self.table_prefix + 'booze'
639        con = self._connect()
640        try:
641            cur = con.cursor()
642            cur.execute("create table %s"
643                " (n smallint, i int[], t text[][])" % table)
644            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
645            # Note that we must explicit casts because we are inserting
646            # empty arrays.  Otherwise this is not necessary.
647            cur.executemany("insert into %s values"
648                " (%%d,%%s::int[],%%s::text[][])" % table, params)
649            cur.execute("select i, t from %s order by n" % table)
650            d = cur.description
651            self.assertEqual(d[0].type_code, pgdb.ARRAY)
652            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
653            self.assertEqual(d[0].type_code, pgdb.NUMBER)
654            self.assertEqual(d[0].type_code, pgdb.INTEGER)
655            self.assertEqual(d[1].type_code, pgdb.ARRAY)
656            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
657            self.assertEqual(d[1].type_code, pgdb.STRING)
658            rows = cur.fetchall()
659        finally:
660            con.close()
661        self.assertEqual(rows, values)
662
663    def test_select_array(self):
664        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
665        con = self._connect()
666        try:
667            cur = con.cursor()
668            cur.execute("select %s::int[], %s::text[]", values)
669            row = cur.fetchone()
670        finally:
671            con.close()
672        self.assertEqual(row, values)
673
674    def test_insert_record(self):
675        values = [('John', 61), ('Jane', 63),
676                  ('Fred', None), ('Wilma', None),
677                  (None, 42), (None, None)]
678        table = self.table_prefix + 'booze'
679        record = self.table_prefix + 'munch'
680        con = self._connect()
681        try:
682            cur = con.cursor()
683            cur.execute("create type %s as (name varchar, age int)" % record)
684            cur.execute("create table %s (n smallint, r %s)" % (table, record))
685            params = enumerate(values)
686            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
687            cur.execute("select r from %s order by n" % table)
688            type_code = cur.description[0].type_code
689            self.assertEqual(type_code, record)
690            self.assertEqual(type_code, pgdb.RECORD)
691            self.assertNotEqual(type_code, pgdb.ARRAY)
692            columns = con.type_cache.get_fields(type_code)
693            self.assertEqual(columns[0].name, 'name')
694            self.assertEqual(columns[1].name, 'age')
695            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
696            self.assertEqual(con.type_cache[columns[1].type], 'int4')
697            rows = cur.fetchall()
698        finally:
699            cur.execute('drop table %s' % table)
700            cur.execute('drop type %s' % record)
701            con.close()
702        self.assertEqual(len(rows), len(values))
703        rows = [row[0] for row in rows]
704        self.assertEqual(rows, values)
705        self.assertEqual(rows[0].name, 'John')
706        self.assertEqual(rows[0].age, 61)
707
708    def test_select_record(self):
709        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
710            '(test)', '(x,y)', ' x y ', 'null', None)
711        con = self._connect()
712        try:
713            cur = con.cursor()
714            cur.execute("select %s as test_record", [value])
715            self.assertEqual(cur.description[0].name, 'test_record')
716            self.assertEqual(cur.description[0].type_code, 'record')
717            row = cur.fetchone()[0]
718        finally:
719            con.close()
720        # Note that the element types get lost since we created an
721        # untyped record (an anonymous composite type). For the same
722        # reason this is also a normal tuple, not a named tuple.
723        text_row = tuple(None if v is None else str(v) for v in value)
724        self.assertEqual(row, text_row)
725
726    def test_custom_type(self):
727        values = [3, 5, 65]
728        values = list(map(PgBitString, values))
729        table = self.table_prefix + 'booze'
730        con = self._connect()
731        try:
732            cur = con.cursor()
733            params = enumerate(values)  # params have __pg_repr__ method
734            cur.execute(
735                'create table "%s" (n smallint, b bit varying(7))' % table)
736            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
737            cur.execute("select * from %s" % table)
738            rows = cur.fetchall()
739        finally:
740            con.close()
741        self.assertEqual(len(rows), len(values))
742        con = self._connect()
743        try:
744            cur = con.cursor()
745            params = (1, object())  # an object that cannot be handled
746            self.assertRaises(pgdb.InterfaceError, cur.execute,
747                "insert into %s values (%%s,%%s)" % table, params)
748        finally:
749            con.close()
750
751    def test_set_decimal_type(self):
752        decimal_type = pgdb.decimal_type()
753        self.assertTrue(decimal_type is not None and callable(decimal_type))
754        con = self._connect()
755        try:
756            cur = con.cursor()
757            # change decimal type globally to int
758            int_type = lambda v: int(float(v))
759            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
760            cur.execute('select 4.25')
761            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
762            value = cur.fetchone()[0]
763            self.assertTrue(isinstance(value, int))
764            self.assertEqual(value, 4)
765            # change decimal type again to float
766            self.assertTrue(pgdb.decimal_type(float) is float)
767            cur.execute('select 4.25')
768            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
769            value = cur.fetchone()[0]
770            # the connection still uses the old setting
771            self.assertTrue(isinstance(value, int))
772            # bust the cache for type functions for the connection
773            con.type_cache.reset_typecast()
774            cur.execute('select 4.25')
775            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
776            value = cur.fetchone()[0]
777            # now the connection uses the new setting
778            self.assertTrue(isinstance(value, float))
779            self.assertEqual(value, 4.25)
780        finally:
781            con.close()
782            pgdb.decimal_type(decimal_type)
783        self.assertTrue(pgdb.decimal_type() is decimal_type)
784
785    def test_global_typecast(self):
786        try:
787            query = 'select 2::int2, 4::int4, 8::int8'
788            self.assertIs(pgdb.get_typecast('int4'), int)
789            cast_int = lambda v: 'int(%s)' % v
790            pgdb.set_typecast('int4', cast_int)
791            con = self._connect()
792            try:
793                i2, i4, i8 = con.cursor().execute(query).fetchone()
794            finally:
795                con.close()
796            self.assertEqual(i2, 2)
797            self.assertEqual(i4, 'int(4)')
798            self.assertEqual(i8, 8)
799            pgdb.set_typecast(['int2', 'int8'], cast_int)
800            con = self._connect()
801            try:
802                i2, i4, i8 = con.cursor().execute(query).fetchone()
803            finally:
804                con.close()
805            self.assertEqual(i2, 'int(2)')
806            self.assertEqual(i4, 'int(4)')
807            self.assertEqual(i8, 'int(8)')
808            pgdb.reset_typecast('int4')
809            con = self._connect()
810            try:
811                i2, i4, i8 = con.cursor().execute(query).fetchone()
812            finally:
813                con.close()
814            self.assertEqual(i2, 'int(2)')
815            self.assertEqual(i4, 4)
816            self.assertEqual(i8, 'int(8)')
817            pgdb.reset_typecast(['int2', 'int8'])
818            con = self._connect()
819            try:
820                i2, i4, i8 = con.cursor().execute(query).fetchone()
821            finally:
822                con.close()
823            self.assertEqual(i2, 2)
824            self.assertEqual(i4, 4)
825            self.assertEqual(i8, 8)
826            pgdb.set_typecast(['int2', 'int8'], cast_int)
827            con = self._connect()
828            try:
829                i2, i4, i8 = con.cursor().execute(query).fetchone()
830            finally:
831                con.close()
832            self.assertEqual(i2, 'int(2)')
833            self.assertEqual(i4, 4)
834            self.assertEqual(i8, 'int(8)')
835        finally:
836            pgdb.reset_typecast()
837        con = self._connect()
838        try:
839            i2, i4, i8 = con.cursor().execute(query).fetchone()
840        finally:
841            con.close()
842        self.assertEqual(i2, 2)
843        self.assertEqual(i4, 4)
844        self.assertEqual(i8, 8)
845
846    def test_unicode_with_utf8(self):
847        table = self.table_prefix + 'booze'
848        input = u"He wes Leovenaðes sone — liðe him be Drihten"
849        con = self._connect()
850        try:
851            cur = con.cursor()
852            cur.execute("create table %s (t text)" % table)
853            try:
854                cur.execute("set client_encoding=utf8")
855                cur.execute(u"select '%s'" % input)
856            except Exception:
857                self.skipTest("database does not support utf8")
858            output1 = cur.fetchone()[0]
859            cur.execute("insert into %s values (%%s)" % table, (input,))
860            cur.execute("select * from %s" % table)
861            output2 = cur.fetchone()[0]
862            cur.execute("select t = '%s' from %s" % (input, table))
863            output3 = cur.fetchone()[0]
864            cur.execute("select t = %%s from %s" % table, (input,))
865            output4 = cur.fetchone()[0]
866        finally:
867            con.close()
868        if str is bytes:  # Python < 3.0
869            input = input.encode('utf8')
870        self.assertIsInstance(output1, str)
871        self.assertEqual(output1, input)
872        self.assertIsInstance(output2, str)
873        self.assertEqual(output2, input)
874        self.assertIsInstance(output3, bool)
875        self.assertTrue(output3)
876        self.assertIsInstance(output4, bool)
877        self.assertTrue(output4)
878
879    def test_unicode_with_latin1(self):
880        table = self.table_prefix + 'booze'
881        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
882        con = self._connect()
883        try:
884            cur = con.cursor()
885            cur.execute("create table %s (t text)" % table)
886            try:
887                cur.execute("set client_encoding=latin1")
888                cur.execute(u"select '%s'" % input)
889            except Exception:
890                self.skipTest("database does not support latin1")
891            output1 = cur.fetchone()[0]
892            cur.execute("insert into %s values (%%s)" % table, (input,))
893            cur.execute("select * from %s" % table)
894            output2 = cur.fetchone()[0]
895            cur.execute("select t = '%s' from %s" % (input, table))
896            output3 = cur.fetchone()[0]
897            cur.execute("select t = %%s from %s" % table, (input,))
898            output4 = cur.fetchone()[0]
899        finally:
900            con.close()
901        if str is bytes:  # Python < 3.0
902            input = input.encode('latin1')
903        self.assertIsInstance(output1, str)
904        self.assertEqual(output1, input)
905        self.assertIsInstance(output2, str)
906        self.assertEqual(output2, input)
907        self.assertIsInstance(output3, bool)
908        self.assertTrue(output3)
909        self.assertIsInstance(output4, bool)
910        self.assertTrue(output4)
911
912    def test_bool(self):
913        values = [False, True, None, 't', 'f', 'true', 'false']
914        table = self.table_prefix + 'booze'
915        con = self._connect()
916        try:
917            cur = con.cursor()
918            cur.execute(
919                "create table %s (n smallint, booltest bool)" % table)
920            params = enumerate(values)
921            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
922            cur.execute("select booltest from %s order by n" % table)
923            rows = cur.fetchall()
924            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
925        finally:
926            con.close()
927        rows = [row[0] for row in rows]
928        values[3] = values[5] = True
929        values[4] = values[6] = False
930        self.assertEqual(rows, values)
931
932    def test_literal(self):
933        con = self._connect()
934        try:
935            cur = con.cursor()
936            value = "lower('Hello')"
937            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
938            row = cur.fetchone()
939        finally:
940            con.close()
941        self.assertEqual(row, (value, 'hello'))
942
943
944    def test_json(self):
945        inval = {"employees":
946            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
947        table = self.table_prefix + 'booze'
948        con = self._connect()
949        try:
950            cur = con.cursor()
951            try:
952                cur.execute("create table %s (jsontest json)" % table)
953            except pgdb.ProgrammingError:
954                self.skipTest('database does not support json')
955            params = (pgdb.Json(inval),)
956            cur.execute("insert into %s values (%%s)" % table, params)
957            cur.execute("select jsontest from %s" % table)
958            outval = cur.fetchone()[0]
959            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
960        finally:
961            con.close()
962        self.assertEqual(inval, outval)
963
964    def test_jsonb(self):
965        inval = {"employees":
966            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
967        table = self.table_prefix + 'booze'
968        con = self._connect()
969        try:
970            cur = con.cursor()
971            try:
972                cur.execute("create table %s (jsonbtest jsonb)" % table)
973            except pgdb.ProgrammingError:
974                self.skipTest('database does not support jsonb')
975            params = (pgdb.Json(inval),)
976            cur.execute("insert into %s values (%%s)" % table, params)
977            cur.execute("select jsonbtest from %s" % table)
978            outval = cur.fetchone()[0]
979            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
980        finally:
981            con.close()
982        self.assertEqual(inval, outval)
983
984    def test_execute_edge_cases(self):
985        con = self._connect()
986        try:
987            cur = con.cursor()
988            sql = 'invalid'  # should be ignored with empty parameter list
989            cur.executemany(sql, [])
990            sql = 'select %d + 1'
991            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
992            self.assertEqual(cur.fetchone()[0], 3)
993            sql = 'select 1/0'  # cannot be executed
994            self.assertRaises(pgdb.ProgrammingError, cur.execute, sql)
995            cur.close()
996            con.rollback()
997            if pgdb.shortcutmethods:
998                res = con.execute('select %d', (1,)).fetchone()
999                self.assertEqual(res, (1,))
1000                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
1001                self.assertEqual(res, (2,))
1002        finally:
1003            con.close()
1004        sql = 'select 1'  # cannot be executed after connection is closed
1005        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
1006
1007    def test_fetchmany_with_keep(self):
1008        con = self._connect()
1009        try:
1010            cur = con.cursor()
1011            self.assertEqual(cur.arraysize, 1)
1012            cur.execute('select * from generate_series(1, 25)')
1013            self.assertEqual(len(cur.fetchmany()), 1)
1014            self.assertEqual(len(cur.fetchmany()), 1)
1015            self.assertEqual(cur.arraysize, 1)
1016            cur.arraysize = 3
1017            self.assertEqual(len(cur.fetchmany()), 3)
1018            self.assertEqual(len(cur.fetchmany()), 3)
1019            self.assertEqual(cur.arraysize, 3)
1020            self.assertEqual(len(cur.fetchmany(size=2)), 2)
1021            self.assertEqual(cur.arraysize, 3)
1022            self.assertEqual(len(cur.fetchmany()), 3)
1023            self.assertEqual(len(cur.fetchmany()), 3)
1024            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
1025            self.assertEqual(cur.arraysize, 2)
1026            self.assertEqual(len(cur.fetchmany()), 2)
1027            self.assertEqual(len(cur.fetchmany()), 2)
1028            self.assertEqual(len(cur.fetchmany(25)), 3)
1029        finally:
1030            con.close()
1031
1032    def test_nextset(self):
1033        con = self._connect()
1034        cur = con.cursor()
1035        self.assertRaises(con.NotSupportedError, cur.nextset)
1036
1037    def test_setoutputsize(self):
1038        pass  # not supported
1039
1040    def test_connection_errors(self):
1041        con = self._connect()
1042        self.assertEqual(con.Error, pgdb.Error)
1043        self.assertEqual(con.Warning, pgdb.Warning)
1044        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
1045        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
1046        self.assertEqual(con.InternalError, pgdb.InternalError)
1047        self.assertEqual(con.OperationalError, pgdb.OperationalError)
1048        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
1049        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
1050        self.assertEqual(con.DataError, pgdb.DataError)
1051        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
1052
1053    def test_connection_as_contextmanager(self):
1054        table = self.table_prefix + 'booze'
1055        con = self._connect()
1056        try:
1057            cur = con.cursor()
1058            cur.execute("create table %s (n smallint check(n!=4))" % table)
1059            with con:
1060                cur.execute("insert into %s values (1)" % table)
1061                cur.execute("insert into %s values (2)" % table)
1062            try:
1063                with con:
1064                    cur.execute("insert into %s values (3)" % table)
1065                    cur.execute("insert into %s values (4)" % table)
1066            except con.ProgrammingError as error:
1067                self.assertTrue('check' in str(error).lower())
1068            with con:
1069                cur.execute("insert into %s values (5)" % table)
1070                cur.execute("insert into %s values (6)" % table)
1071            try:
1072                with con:
1073                    cur.execute("insert into %s values (7)" % table)
1074                    cur.execute("insert into %s values (8)" % table)
1075                    raise ValueError('transaction should rollback')
1076            except ValueError as error:
1077                self.assertEqual(str(error), 'transaction should rollback')
1078            with con:
1079                cur.execute("insert into %s values (9)" % table)
1080            cur.execute("select * from %s order by 1" % table)
1081            rows = cur.fetchall()
1082            rows = [row[0] for row in rows]
1083        finally:
1084            con.close()
1085        self.assertEqual(rows, [1, 2, 5, 6, 9])
1086
1087    def test_cursor_connection(self):
1088        con = self._connect()
1089        cur = con.cursor()
1090        self.assertEqual(cur.connection, con)
1091        cur.close()
1092
1093    def test_cursor_as_contextmanager(self):
1094        con = self._connect()
1095        with con.cursor() as cur:
1096            self.assertEqual(cur.connection, con)
1097
1098    def test_pgdb_type(self):
1099        self.assertEqual(pgdb.STRING, pgdb.STRING)
1100        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
1101        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
1102        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
1103        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
1104        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
1105        self.assertEqual('char', pgdb.STRING)
1106        self.assertEqual('varchar', pgdb.STRING)
1107        self.assertEqual('text', pgdb.STRING)
1108        self.assertNotEqual('numeric', pgdb.STRING)
1109        self.assertEqual('numeric', pgdb.NUMERIC)
1110        self.assertEqual('numeric', pgdb.NUMBER)
1111        self.assertEqual('int4', pgdb.NUMBER)
1112        self.assertNotEqual('int4', pgdb.NUMERIC)
1113        self.assertEqual('int2', pgdb.SMALLINT)
1114        self.assertNotEqual('int4', pgdb.SMALLINT)
1115        self.assertEqual('int2', pgdb.INTEGER)
1116        self.assertEqual('int4', pgdb.INTEGER)
1117        self.assertEqual('int8', pgdb.INTEGER)
1118        self.assertNotEqual('int4', pgdb.LONG)
1119        self.assertEqual('int8', pgdb.LONG)
1120        self.assertTrue('char' in pgdb.STRING)
1121        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1122        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1123        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1124        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1125        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1126        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1127        self.assertEqual('_char', pgdb.ARRAY)
1128        self.assertNotEqual('char', pgdb.ARRAY)
1129        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1130        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1131        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1132        self.assertEqual('record', pgdb.RECORD)
1133        self.assertNotEqual('_record', pgdb.RECORD)
1134
1135
1136if __name__ == '__main__':
1137    unittest.main()
Note: See TracBrowser for help on using the repository browser.