source: trunk/tests/test_dbapi20.py @ 797

Last change on this file since 797 was 797, checked in by cito, 4 years ago

Cache typecast functions and make them configurable

The typecast functions used by the pgdb module are now cached
using a local and a global Typecasts class. The local cache is
bound to the connection and knows how to cast composite types.

Also added functions that allow registering custom typecast
functions on the global and local level.

Also added a chapter on type adaptation and casting to the docs.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 39.9 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 797 2016-01-29 22:43:22Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import datetime
32
33try:
34    long
35except NameError:  # Python >= 3.0
36    long = int
37
38try:
39    from collections import OrderedDict
40except ImportError:  # Python 2.6 or 3.0
41    OrderedDict = None
42
43
44class PgBitString:
45    """Test object with a PostgreSQL representation as Bit String."""
46
47    def __init__(self, value):
48        self.value = value
49
50    def __pg_repr__(self):
51         return "B'{0:b}'".format(self.value)
52
53
54class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
55
56    driver = pgdb
57    connect_args = ()
58    connect_kw_args = {'database': dbname,
59        'host': '%s:%d' % (dbhost or '', dbport or -1)}
60
61    lower_func = 'lower'  # For stored procedure test
62
63    def setUp(self):
64        # Call superclass setUp in case this does something in the future
65        dbapi20.DatabaseAPI20Test.setUp(self)
66        try:
67            con = self._connect()
68            con.close()
69        except pgdb.Error:  # try to create a missing database
70            import pg
71            try:  # first try to log in as superuser
72                db = pg.DB('postgres', dbhost or None, dbport or -1,
73                    user='postgres')
74            except Exception:  # then try to log in as current user
75                db = pg.DB('postgres', dbhost or None, dbport or -1)
76            db.query('create database ' + dbname)
77
78    def tearDown(self):
79        dbapi20.DatabaseAPI20Test.tearDown(self)
80
81    def test_callproc_no_params(self):
82        con = self._connect()
83        cur = con.cursor()
84        # note that now() does not change within a transaction
85        cur.execute('select now()')
86        now = cur.fetchone()[0]
87        res = cur.callproc('now')
88        self.assertIsNone(res)
89        res = cur.fetchone()[0]
90        self.assertEqual(res, now)
91
92    def test_callproc_bad_params(self):
93        con = self._connect()
94        cur = con.cursor()
95        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
96        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
97
98    def test_callproc_one_param(self):
99        con = self._connect()
100        cur = con.cursor()
101        params = (42.4382,)
102        res = cur.callproc("round", params)
103        self.assertIs(res, params)
104        res = cur.fetchone()[0]
105        self.assertEqual(res, 42)
106
107    def test_callproc_two_params(self):
108        con = self._connect()
109        cur = con.cursor()
110        params = (9, 4)
111        res = cur.callproc("div", params)
112        self.assertIs(res, params)
113        res = cur.fetchone()[0]
114        self.assertEqual(res, 2)
115
116    def test_cursor_type(self):
117
118        class TestCursor(pgdb.Cursor):
119            pass
120
121        con = self._connect()
122        self.assertIs(con.cursor_type, pgdb.Cursor)
123        cur = con.cursor()
124        self.assertIsInstance(cur, pgdb.Cursor)
125        self.assertNotIsInstance(cur, TestCursor)
126        con.cursor_type = TestCursor
127        cur = con.cursor()
128        self.assertIsInstance(cur, TestCursor)
129        cur = con.cursor()
130        self.assertIsInstance(cur, TestCursor)
131        con = self._connect()
132        self.assertIs(con.cursor_type, pgdb.Cursor)
133        cur = con.cursor()
134        self.assertIsInstance(cur, pgdb.Cursor)
135        self.assertNotIsInstance(cur, TestCursor)
136
137    def test_row_factory(self):
138
139        class TestCursor(pgdb.Cursor):
140
141            def row_factory(self, row):
142                return dict(('column %s' % desc[0], value)
143                    for desc, value in zip(self.description, row))
144
145        con = self._connect()
146        con.cursor_type = TestCursor
147        cur = con.cursor()
148        self.assertIsInstance(cur, TestCursor)
149        res = cur.execute("select 1 as a, 2 as b")
150        self.assertIs(res, cur, 'execute() should return cursor')
151        res = cur.fetchone()
152        self.assertIsInstance(res, dict)
153        self.assertEqual(res, {'column a': 1, 'column b': 2})
154        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
155        res = cur.fetchall()
156        self.assertIsInstance(res, list)
157        self.assertEqual(len(res), 2)
158        self.assertIsInstance(res[0], dict)
159        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
160        self.assertIsInstance(res[1], dict)
161        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
162
163    def test_build_row_factory(self):
164
165        class TestCursor(pgdb.Cursor):
166
167            def build_row_factory(self):
168                keys = [desc[0] for desc in self.description]
169                return lambda row: dict((key, value)
170                    for key, value in zip(keys, row))
171
172        con = self._connect()
173        con.cursor_type = TestCursor
174        cur = con.cursor()
175        self.assertIsInstance(cur, TestCursor)
176        cur.execute("select 1 as a, 2 as b")
177        res = cur.fetchone()
178        self.assertIsInstance(res, dict)
179        self.assertEqual(res, {'a': 1, 'b': 2})
180        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
181        res = cur.fetchall()
182        self.assertIsInstance(res, list)
183        self.assertEqual(len(res), 2)
184        self.assertIsInstance(res[0], dict)
185        self.assertEqual(res[0], {'a': 1, 'b': 2})
186        self.assertIsInstance(res[1], dict)
187        self.assertEqual(res[1], {'a': 3, 'b': 4})
188
189    def test_cursor_with_named_columns(self):
190        con = self._connect()
191        cur = con.cursor()
192        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
193        self.assertIs(res, cur, 'execute() should return cursor')
194        res = cur.fetchone()
195        self.assertIsInstance(res, tuple)
196        self.assertEqual(res, (1, 2, 3))
197        self.assertEqual(res._fields, ('abc', 'de', 'f'))
198        self.assertEqual(res.abc, 1)
199        self.assertEqual(res.de, 2)
200        self.assertEqual(res.f, 3)
201        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
202        res = cur.fetchall()
203        self.assertIsInstance(res, list)
204        self.assertEqual(len(res), 2)
205        self.assertIsInstance(res[0], tuple)
206        self.assertEqual(res[0], (1, 2))
207        self.assertEqual(res[0]._fields, ('one', 'two'))
208        self.assertIsInstance(res[1], tuple)
209        self.assertEqual(res[1], (3, 4))
210        self.assertEqual(res[1]._fields, ('one', 'two'))
211
212    def test_cursor_with_unnamed_columns(self):
213        con = self._connect()
214        cur = con.cursor()
215        cur.execute("select 1, 2, 3")
216        res = cur.fetchone()
217        self.assertIsInstance(res, tuple)
218        self.assertEqual(res, (1, 2, 3))
219        old_py = OrderedDict is None  # Python 2.6 or 3.0
220        # old Python versions cannot rename tuple fields with underscore
221        if old_py:
222            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
223        else:
224            self.assertEqual(res._fields, ('_0', '_1', '_2'))
225        cur.execute("select 1 as one, 2, 3 as three")
226        res = cur.fetchone()
227        self.assertIsInstance(res, tuple)
228        self.assertEqual(res, (1, 2, 3))
229        if old_py:  # cannot auto rename with underscore
230            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
231        else:
232            self.assertEqual(res._fields, ('one', '_1', 'three'))
233        cur.execute("select 1 as abc, 2 as def")
234        res = cur.fetchone()
235        self.assertIsInstance(res, tuple)
236        self.assertEqual(res, (1, 2))
237        if old_py:
238            self.assertEqual(res._fields, ('column_0', 'column_1'))
239        else:
240            self.assertEqual(res._fields, ('abc', '_1'))
241
242    def test_colnames(self):
243        con = self._connect()
244        cur = con.cursor()
245        cur.execute("select 1, 2, 3")
246        names = cur.colnames
247        self.assertIsInstance(names, list)
248        self.assertEqual(names, ['?column?', '?column?', '?column?'])
249        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
250        names = cur.colnames
251        self.assertIsInstance(names, list)
252        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
253
254    def test_coltypes(self):
255        con = self._connect()
256        cur = con.cursor()
257        cur.execute("select 1::int2, 2::int4, 3::int8")
258        types = cur.coltypes
259        self.assertIsInstance(types, list)
260        self.assertEqual(types, ['int2', 'int4', 'int8'])
261
262    def test_description_fields(self):
263        con = self._connect()
264        cur = con.cursor()
265        cur.execute("select 123456789::int8 col0,"
266            " 123456.789::numeric(41, 13) as col1,"
267            " 'foobar'::char(39) as col2")
268        desc = cur.description
269        self.assertIsInstance(desc, list)
270        self.assertEqual(len(desc), 3)
271        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
272        for i in range(3):
273            c, d = cols[i], desc[i]
274            self.assertIsInstance(d, tuple)
275            self.assertEqual(len(d), 7)
276            self.assertIsInstance(d.name, str)
277            self.assertEqual(d.name, 'col%d' % i)
278            self.assertIsInstance(d.type_code, str)
279            self.assertEqual(d.type_code, c[0])
280            self.assertIsNone(d.display_size)
281            self.assertIsInstance(d.internal_size, int)
282            self.assertEqual(d.internal_size, c[1])
283            if c[2] is not None:
284                self.assertIsInstance(d.precision, int)
285                self.assertEqual(d.precision, c[1])
286                self.assertIsInstance(d.scale, int)
287                self.assertEqual(d.scale, c[2])
288            else:
289                self.assertIsNone(d.precision)
290                self.assertIsNone(d.scale)
291            self.assertIsNone(d.null_ok)
292
293    def test_type_cache_info(self):
294        con = self._connect()
295        try:
296            cur = con.cursor()
297            type_cache = con.type_cache
298            self.assertNotIn('numeric', type_cache)
299            type_info = type_cache['numeric']
300            self.assertIn('numeric', type_cache)
301            self.assertEqual(type_info, 'numeric')
302            self.assertEqual(type_info.oid, 1700)
303            self.assertEqual(type_info.len, -1)
304            self.assertEqual(type_info.type, 'b')  # base
305            self.assertEqual(type_info.category, 'N')  # numeric
306            self.assertEqual(type_info.delim, ',')
307            self.assertEqual(type_info.relid, 0)
308            self.assertIs(con.type_cache[1700], type_info)
309            self.assertNotIn('pg_type', type_cache)
310            type_info = type_cache['pg_type']
311            self.assertIn('numeric', type_cache)
312            self.assertEqual(type_info.type, 'c')  # composite
313            self.assertEqual(type_info.category, 'C')  # composite
314            cols = type_cache.get_fields('pg_type')
315            self.assertEqual(cols[0].name, 'typname')
316            typname = type_cache[cols[0].type]
317            self.assertEqual(typname, 'name')
318            self.assertEqual(typname.type, 'b')  # base
319            self.assertEqual(typname.category, 'S')  # string
320            self.assertEqual(cols[3].name, 'typlen')
321            typlen = type_cache[cols[3].type]
322            self.assertEqual(typlen, 'int2')
323            self.assertEqual(typlen.type, 'b')  # base
324            self.assertEqual(typlen.category, 'N')  # numeric
325            cur.close()
326            cur = con.cursor()
327            type_cache = con.type_cache
328            self.assertIn('numeric', type_cache)
329            cur.close()
330        finally:
331            con.close()
332        con = self._connect()
333        try:
334            cur = con.cursor()
335            type_cache = con.type_cache
336            self.assertNotIn('pg_type', type_cache)
337            self.assertEqual(type_cache.get('pg_type'), type_info)
338            self.assertIn('pg_type', type_cache)
339            self.assertIsNone(type_cache.get(
340                self.table_prefix + '_surely_does_not_exist'))
341            cur.close()
342        finally:
343            con.close()
344
345    def test_type_cache_typecast(self):
346        con = self._connect()
347        try:
348            cur = con.cursor()
349            type_cache = con.type_cache
350            self.assertIs(type_cache.get_typecast('int4'), int)
351            cast_int = lambda v: 'int(%s)' % v
352            type_cache.set_typecast('int4', cast_int)
353            query = 'select 2::int2, 4::int4, 8::int8'
354            cur.execute(query)
355            i2, i4, i8 = cur.fetchone()
356            self.assertEqual(i2, 2)
357            self.assertEqual(i4, 'int(4)')
358            self.assertEqual(i8, 8)
359            self.assertEqual(type_cache.typecast('int4', 42), 'int(42)')
360            type_cache.set_typecast(['int2', 'int8'], cast_int)
361            cur.execute(query)
362            i2, i4, i8 = cur.fetchone()
363            self.assertEqual(i2, 'int(2)')
364            self.assertEqual(i4, 'int(4)')
365            self.assertEqual(i8, 'int(8)')
366            type_cache.reset_typecast('int4')
367            cur.execute(query)
368            i2, i4, i8 = cur.fetchone()
369            self.assertEqual(i2, 'int(2)')
370            self.assertEqual(i4, 4)
371            self.assertEqual(i8, 'int(8)')
372            type_cache.reset_typecast(['int2', 'int8'])
373            cur.execute(query)
374            i2, i4, i8 = cur.fetchone()
375            self.assertEqual(i2, 2)
376            self.assertEqual(i4, 4)
377            self.assertEqual(i8, 8)
378            type_cache.set_typecast(['int2', 'int8'], cast_int)
379            cur.execute(query)
380            i2, i4, i8 = cur.fetchone()
381            self.assertEqual(i2, 'int(2)')
382            self.assertEqual(i4, 4)
383            self.assertEqual(i8, 'int(8)')
384            type_cache.reset_typecast()
385            cur.execute(query)
386            i2, i4, i8 = cur.fetchone()
387            self.assertEqual(i2, 2)
388            self.assertEqual(i4, 4)
389            self.assertEqual(i8, 8)
390            cur.close()
391        finally:
392            con.close()
393
394    def test_cursor_iteration(self):
395        con = self._connect()
396        cur = con.cursor()
397        cur.execute("select 1 union select 2 union select 3")
398        self.assertEqual([r[0] for r in cur], [1, 2, 3])
399
400    def test_fetch_2_rows(self):
401        Decimal = pgdb.decimal_type()
402        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
403            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
404            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
405            7897234)
406        table = self.table_prefix + 'booze'
407        con = self._connect()
408        try:
409            cur = con.cursor()
410            cur.execute("set datestyle to 'iso'")
411            cur.execute("create table %s ("
412                "stringtest varchar,"
413                "binarytest bytea,"
414                "booltest bool,"
415                "integertest int4,"
416                "longtest int8,"
417                "floattest float8,"
418                "numerictest numeric,"
419                "moneytest money,"
420                "datetest date,"
421                "timetest time,"
422                "datetimetest timestamp,"
423                "intervaltest interval,"
424                "rowidtest oid)" % table)
425            cur.execute("set standard_conforming_strings to on")
426            for s in ('numeric', 'monetary', 'time'):
427                cur.execute("set lc_%s to 'C'" % s)
428            for _i in range(2):
429                cur.execute("insert into %s values ("
430                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
431                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
432            cur.execute("select * from %s" % table)
433            rows = cur.fetchall()
434            self.assertEqual(len(rows), 2)
435            row0 = rows[0]
436            self.assertEqual(row0, values)
437            self.assertEqual(row0, rows[1])
438            self.assertIsInstance(row0[0], str)
439            self.assertIsInstance(row0[1], bytes)
440            self.assertIsInstance(row0[2], bool)
441            self.assertIsInstance(row0[3], int)
442            self.assertIsInstance(row0[4], long)
443            self.assertIsInstance(row0[5], float)
444            self.assertIsInstance(row0[6], Decimal)
445            self.assertIsInstance(row0[7], Decimal)
446            self.assertIsInstance(row0[8], str)
447            self.assertIsInstance(row0[9], str)
448            self.assertIsInstance(row0[10], str)
449            self.assertIsInstance(row0[11], str)
450        finally:
451            con.close()
452
453    def test_sqlstate(self):
454        con = self._connect()
455        cur = con.cursor()
456        try:
457            cur.execute("select 1/0")
458        except pgdb.DatabaseError as error:
459            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
460            # the SQLSTATE error code for division by zero is 22012
461            self.assertEqual(error.sqlstate, '22012')
462
463    def test_float(self):
464        nan, inf = float('nan'), float('inf')
465        from math import isnan, isinf
466        self.assertTrue(isnan(nan) and not isinf(nan))
467        self.assertTrue(isinf(inf) and not isnan(inf))
468        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
469            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
470        table = self.table_prefix + 'booze'
471        con = self._connect()
472        try:
473            cur = con.cursor()
474            cur.execute(
475                "create table %s (n smallint, floattest float)" % table)
476            params = enumerate(values)
477            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
478            cur.execute("select floattest from %s order by n" % table)
479            rows = cur.fetchall()
480            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
481            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
482            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
483        finally:
484            con.close()
485        self.assertEqual(len(rows), len(values))
486        rows = [row[0] for row in rows]
487        for inval, outval in zip(values, rows):
488            if inval in ('inf', 'Infinity'):
489                inval = inf
490            elif inval in ('-inf', '-Infinity'):
491                inval = -inf
492            elif inval in ('nan', 'NaN'):
493                inval = nan
494            if isinf(inval):
495                self.assertTrue(isinf(outval))
496                if inval < 0:
497                    self.assertTrue(outval < 0)
498                else:
499                    self.assertTrue(outval > 0)
500            elif isnan(inval):
501                self.assertTrue(isnan(outval))
502            else:
503                self.assertEqual(inval, outval)
504
505    def test_datetime(self):
506        values = ['2011-07-17 15:47:42', datetime(2016, 1, 20, 20, 15, 51)]
507        table = self.table_prefix + 'booze'
508        con = self._connect()
509        try:
510            cur = con.cursor()
511            cur.execute("set datestyle to 'iso'")
512            cur.execute(
513                "create table %s (n smallint, ts timestamp)" % table)
514            params = enumerate(values)
515            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
516            cur.execute("select ts from %s order by n" % table)
517            rows = cur.fetchall()
518            self.assertEqual(cur.description[0].type_code, pgdb.DATETIME)
519            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
520            self.assertNotEqual(cur.description[0].type_code, pgdb.RECORD)
521        finally:
522            con.close()
523        self.assertEqual(len(rows), len(values))
524        rows = [row[0] for row in rows]
525        for inval, outval in zip(values, rows):
526            if isinstance(inval, datetime):
527                inval = inval.strftime('%Y-%m-%d %H:%M:%S')
528            self.assertEqual(inval, outval)
529
530    def test_insert_array(self):
531        values = [(None, None), ([], []), ([None], [[None], ['null']]),
532            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
533            ([20000, 25000, 25000, 30000],
534            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
535            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
536        table = self.table_prefix + 'booze'
537        con = self._connect()
538        try:
539            cur = con.cursor()
540            cur.execute("create table %s"
541                " (n smallint, i int[], t text[][])" % table)
542            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
543            # Note that we must explicit casts because we are inserting
544            # empty arrays.  Otherwise this is not necessary.
545            cur.executemany("insert into %s values"
546                " (%%d,%%s::int[],%%s::text[][])" % table, params)
547            cur.execute("select i, t from %s order by n" % table)
548            d = cur.description
549            self.assertEqual(d[0].type_code, pgdb.ARRAY)
550            self.assertNotEqual(d[0].type_code, pgdb.RECORD)
551            self.assertEqual(d[0].type_code, pgdb.NUMBER)
552            self.assertEqual(d[0].type_code, pgdb.INTEGER)
553            self.assertEqual(d[1].type_code, pgdb.ARRAY)
554            self.assertNotEqual(d[1].type_code, pgdb.RECORD)
555            self.assertEqual(d[1].type_code, pgdb.STRING)
556            rows = cur.fetchall()
557        finally:
558            con.close()
559        self.assertEqual(rows, values)
560
561    def test_select_array(self):
562        values = ([1, 2, 3, None], ['a', 'b', 'c', None])
563        con = self._connect()
564        try:
565            cur = con.cursor()
566            cur.execute("select %s::int[], %s::text[]", values)
567            row = cur.fetchone()
568        finally:
569            con.close()
570        self.assertEqual(row, values)
571
572    def test_insert_record(self):
573        values = [('John', 61), ('Jane', 63),
574                  ('Fred', None), ('Wilma', None),
575                  (None, 42), (None, None)]
576        table = self.table_prefix + 'booze'
577        record = self.table_prefix + 'munch'
578        con = self._connect()
579        try:
580            cur = con.cursor()
581            cur.execute("create type %s as (name varchar, age int)" % record)
582            cur.execute("create table %s (n smallint, r %s)" % (table, record))
583            params = enumerate(values)
584            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
585            cur.execute("select r from %s order by n" % table)
586            type_code = cur.description[0].type_code
587            self.assertEqual(type_code, record)
588            self.assertEqual(type_code, pgdb.RECORD)
589            self.assertNotEqual(type_code, pgdb.ARRAY)
590            columns = con.type_cache.get_fields(type_code)
591            self.assertEqual(columns[0].name, 'name')
592            self.assertEqual(columns[1].name, 'age')
593            self.assertEqual(con.type_cache[columns[0].type], 'varchar')
594            self.assertEqual(con.type_cache[columns[1].type], 'int4')
595            rows = cur.fetchall()
596        finally:
597            cur.execute('drop table %s' % table)
598            cur.execute('drop type %s' % record)
599            con.close()
600        self.assertEqual(len(rows), len(values))
601        rows = [row[0] for row in rows]
602        self.assertEqual(rows, values)
603        self.assertEqual(rows[0].name, 'John')
604        self.assertEqual(rows[0].age, 61)
605
606    def test_select_record(self):
607        value = (1, 25000, 2.5, 'hello', 'Hello World!', 'Hello, World!',
608            '(test)', '(x,y)', ' x y ', 'null', None)
609        con = self._connect()
610        try:
611            cur = con.cursor()
612            cur.execute("select %s as test_record", [value])
613            self.assertEqual(cur.description[0].name, 'test_record')
614            self.assertEqual(cur.description[0].type_code, 'record')
615            row = cur.fetchone()[0]
616        finally:
617            con.close()
618        # Note that the element types get lost since we created an
619        # untyped record (an anonymous composite type). For the same
620        # reason this is also a normal tuple, not a named tuple.
621        text_row = tuple(None if v is None else str(v) for v in value)
622        self.assertEqual(row, text_row)
623
624    def test_custom_type(self):
625        values = [3, 5, 65]
626        values = list(map(PgBitString, values))
627        table = self.table_prefix + 'booze'
628        con = self._connect()
629        try:
630            cur = con.cursor()
631            params = enumerate(values)  # params have __pg_repr__ method
632            cur.execute(
633                'create table "%s" (n smallint, b bit varying(7))' % table)
634            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
635            cur.execute("select * from %s" % table)
636            rows = cur.fetchall()
637        finally:
638            con.close()
639        self.assertEqual(len(rows), len(values))
640        con = self._connect()
641        try:
642            cur = con.cursor()
643            params = (1, object())  # an object that cannot be handled
644            self.assertRaises(pgdb.InterfaceError, cur.execute,
645                "insert into %s values (%%s,%%s)" % table, params)
646        finally:
647            con.close()
648
649    def test_set_decimal_type(self):
650        decimal_type = pgdb.decimal_type()
651        self.assertTrue(decimal_type is not None and callable(decimal_type))
652        con = self._connect()
653        try:
654            cur = con.cursor()
655            # change decimal type globally to int
656            int_type = lambda v: int(float(v))
657            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
658            cur.execute('select 4.25')
659            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
660            value = cur.fetchone()[0]
661            self.assertTrue(isinstance(value, int))
662            self.assertEqual(value, 4)
663            # change decimal type again to float
664            self.assertTrue(pgdb.decimal_type(float) is float)
665            cur.execute('select 4.25')
666            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
667            value = cur.fetchone()[0]
668            # the connection still uses the old setting
669            self.assertTrue(isinstance(value, int))
670            # bust the cache for type functions for the connection
671            con.type_cache.reset_typecast()
672            cur.execute('select 4.25')
673            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
674            value = cur.fetchone()[0]
675            # now the connection uses the new setting
676            self.assertTrue(isinstance(value, float))
677            self.assertEqual(value, 4.25)
678        finally:
679            con.close()
680            pgdb.decimal_type(decimal_type)
681        self.assertTrue(pgdb.decimal_type() is decimal_type)
682
683    def test_global_typecast(self):
684        try:
685            query = 'select 2::int2, 4::int4, 8::int8'
686            self.assertIs(pgdb.get_typecast('int4'), int)
687            cast_int = lambda v: 'int(%s)' % v
688            pgdb.set_typecast('int4', cast_int)
689            con = self._connect()
690            try:
691                i2, i4, i8 = con.cursor().execute(query).fetchone()
692            finally:
693                con.close()
694            self.assertEqual(i2, 2)
695            self.assertEqual(i4, 'int(4)')
696            self.assertEqual(i8, 8)
697            pgdb.set_typecast(['int2', 'int8'], cast_int)
698            con = self._connect()
699            try:
700                i2, i4, i8 = con.cursor().execute(query).fetchone()
701            finally:
702                con.close()
703            self.assertEqual(i2, 'int(2)')
704            self.assertEqual(i4, 'int(4)')
705            self.assertEqual(i8, 'int(8)')
706            pgdb.reset_typecast('int4')
707            con = self._connect()
708            try:
709                i2, i4, i8 = con.cursor().execute(query).fetchone()
710            finally:
711                con.close()
712            self.assertEqual(i2, 'int(2)')
713            self.assertEqual(i4, 4)
714            self.assertEqual(i8, 'int(8)')
715            pgdb.reset_typecast(['int2', 'int8'])
716            con = self._connect()
717            try:
718                i2, i4, i8 = con.cursor().execute(query).fetchone()
719            finally:
720                con.close()
721            self.assertEqual(i2, 2)
722            self.assertEqual(i4, 4)
723            self.assertEqual(i8, 8)
724            pgdb.set_typecast(['int2', 'int8'], cast_int)
725            con = self._connect()
726            try:
727                i2, i4, i8 = con.cursor().execute(query).fetchone()
728            finally:
729                con.close()
730            self.assertEqual(i2, 'int(2)')
731            self.assertEqual(i4, 4)
732            self.assertEqual(i8, 'int(8)')
733        finally:
734            pgdb.reset_typecast()
735        con = self._connect()
736        try:
737            i2, i4, i8 = con.cursor().execute(query).fetchone()
738        finally:
739            con.close()
740        self.assertEqual(i2, 2)
741        self.assertEqual(i4, 4)
742        self.assertEqual(i8, 8)
743
744    def test_unicode_with_utf8(self):
745        table = self.table_prefix + 'booze'
746        input = u"He wes Leovenaðes sone — liðe him be Drihten"
747        con = self._connect()
748        try:
749            cur = con.cursor()
750            cur.execute("create table %s (t text)" % table)
751            try:
752                cur.execute("set client_encoding=utf8")
753                cur.execute(u"select '%s'" % input)
754            except Exception:
755                self.skipTest("database does not support utf8")
756            output1 = cur.fetchone()[0]
757            cur.execute("insert into %s values (%%s)" % table, (input,))
758            cur.execute("select * from %s" % table)
759            output2 = cur.fetchone()[0]
760            cur.execute("select t = '%s' from %s" % (input, table))
761            output3 = cur.fetchone()[0]
762            cur.execute("select t = %%s from %s" % table, (input,))
763            output4 = cur.fetchone()[0]
764        finally:
765            con.close()
766        if str is bytes:  # Python < 3.0
767            input = input.encode('utf8')
768        self.assertIsInstance(output1, str)
769        self.assertEqual(output1, input)
770        self.assertIsInstance(output2, str)
771        self.assertEqual(output2, input)
772        self.assertIsInstance(output3, bool)
773        self.assertTrue(output3)
774        self.assertIsInstance(output4, bool)
775        self.assertTrue(output4)
776
777    def test_unicode_with_latin1(self):
778        table = self.table_prefix + 'booze'
779        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
780        con = self._connect()
781        try:
782            cur = con.cursor()
783            cur.execute("create table %s (t text)" % table)
784            try:
785                cur.execute("set client_encoding=latin1")
786                cur.execute(u"select '%s'" % input)
787            except Exception:
788                self.skipTest("database does not support latin1")
789            output1 = cur.fetchone()[0]
790            cur.execute("insert into %s values (%%s)" % table, (input,))
791            cur.execute("select * from %s" % table)
792            output2 = cur.fetchone()[0]
793            cur.execute("select t = '%s' from %s" % (input, table))
794            output3 = cur.fetchone()[0]
795            cur.execute("select t = %%s from %s" % table, (input,))
796            output4 = cur.fetchone()[0]
797        finally:
798            con.close()
799        if str is bytes:  # Python < 3.0
800            input = input.encode('latin1')
801        self.assertIsInstance(output1, str)
802        self.assertEqual(output1, input)
803        self.assertIsInstance(output2, str)
804        self.assertEqual(output2, input)
805        self.assertIsInstance(output3, bool)
806        self.assertTrue(output3)
807        self.assertIsInstance(output4, bool)
808        self.assertTrue(output4)
809
810    def test_bool(self):
811        values = [False, True, None, 't', 'f', 'true', 'false']
812        table = self.table_prefix + 'booze'
813        con = self._connect()
814        try:
815            cur = con.cursor()
816            cur.execute(
817                "create table %s (n smallint, booltest bool)" % table)
818            params = enumerate(values)
819            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
820            cur.execute("select booltest from %s order by n" % table)
821            rows = cur.fetchall()
822            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
823        finally:
824            con.close()
825        rows = [row[0] for row in rows]
826        values[3] = values[5] = True
827        values[4] = values[6] = False
828        self.assertEqual(rows, values)
829
830    def test_json(self):
831        inval = {"employees":
832            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
833        table = self.table_prefix + 'booze'
834        con = self._connect()
835        try:
836            cur = con.cursor()
837            try:
838                cur.execute("create table %s (jsontest json)" % table)
839            except pgdb.ProgrammingError:
840                self.skipTest('database does not support json')
841            params = (pgdb.Json(inval),)
842            cur.execute("insert into %s values (%%s)" % table, params)
843            cur.execute("select jsontest from %s" % table)
844            outval = cur.fetchone()[0]
845            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
846        finally:
847            con.close()
848        self.assertEqual(inval, outval)
849
850    def test_jsonb(self):
851        inval = {"employees":
852            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
853        table = self.table_prefix + 'booze'
854        con = self._connect()
855        try:
856            cur = con.cursor()
857            try:
858                cur.execute("create table %s (jsonbtest jsonb)" % table)
859            except pgdb.ProgrammingError:
860                self.skipTest('database does not support jsonb')
861            params = (pgdb.Json(inval),)
862            cur.execute("insert into %s values (%%s)" % table, params)
863            cur.execute("select jsonbtest from %s" % table)
864            outval = cur.fetchone()[0]
865            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
866        finally:
867            con.close()
868        self.assertEqual(inval, outval)
869
870    def test_execute_edge_cases(self):
871        con = self._connect()
872        try:
873            cur = con.cursor()
874            sql = 'invalid'  # should be ignored with empty parameter list
875            cur.executemany(sql, [])
876            sql = 'select %d + 1'
877            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
878            self.assertEqual(cur.fetchone()[0], 3)
879            sql = 'select 1/0'  # cannot be executed
880            self.assertRaises(pgdb.ProgrammingError, cur.execute, sql)
881            cur.close()
882            con.rollback()
883            if pgdb.shortcutmethods:
884                res = con.execute('select %d', (1,)).fetchone()
885                self.assertEqual(res, (1,))
886                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
887                self.assertEqual(res, (2,))
888        finally:
889            con.close()
890        sql = 'select 1'  # cannot be executed after connection is closed
891        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
892
893    def test_fetchmany_with_keep(self):
894        con = self._connect()
895        try:
896            cur = con.cursor()
897            self.assertEqual(cur.arraysize, 1)
898            cur.execute('select * from generate_series(1, 25)')
899            self.assertEqual(len(cur.fetchmany()), 1)
900            self.assertEqual(len(cur.fetchmany()), 1)
901            self.assertEqual(cur.arraysize, 1)
902            cur.arraysize = 3
903            self.assertEqual(len(cur.fetchmany()), 3)
904            self.assertEqual(len(cur.fetchmany()), 3)
905            self.assertEqual(cur.arraysize, 3)
906            self.assertEqual(len(cur.fetchmany(size=2)), 2)
907            self.assertEqual(cur.arraysize, 3)
908            self.assertEqual(len(cur.fetchmany()), 3)
909            self.assertEqual(len(cur.fetchmany()), 3)
910            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
911            self.assertEqual(cur.arraysize, 2)
912            self.assertEqual(len(cur.fetchmany()), 2)
913            self.assertEqual(len(cur.fetchmany()), 2)
914            self.assertEqual(len(cur.fetchmany(25)), 3)
915        finally:
916            con.close()
917
918    def test_nextset(self):
919        con = self._connect()
920        cur = con.cursor()
921        self.assertRaises(con.NotSupportedError, cur.nextset)
922
923    def test_setoutputsize(self):
924        pass  # not supported
925
926    def test_connection_errors(self):
927        con = self._connect()
928        self.assertEqual(con.Error, pgdb.Error)
929        self.assertEqual(con.Warning, pgdb.Warning)
930        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
931        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
932        self.assertEqual(con.InternalError, pgdb.InternalError)
933        self.assertEqual(con.OperationalError, pgdb.OperationalError)
934        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
935        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
936        self.assertEqual(con.DataError, pgdb.DataError)
937        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
938
939    def test_connection_as_contextmanager(self):
940        table = self.table_prefix + 'booze'
941        con = self._connect()
942        try:
943            cur = con.cursor()
944            cur.execute("create table %s (n smallint check(n!=4))" % table)
945            with con:
946                cur.execute("insert into %s values (1)" % table)
947                cur.execute("insert into %s values (2)" % table)
948            try:
949                with con:
950                    cur.execute("insert into %s values (3)" % table)
951                    cur.execute("insert into %s values (4)" % table)
952            except con.ProgrammingError as error:
953                self.assertTrue('check' in str(error).lower())
954            with con:
955                cur.execute("insert into %s values (5)" % table)
956                cur.execute("insert into %s values (6)" % table)
957            try:
958                with con:
959                    cur.execute("insert into %s values (7)" % table)
960                    cur.execute("insert into %s values (8)" % table)
961                    raise ValueError('transaction should rollback')
962            except ValueError as error:
963                self.assertEqual(str(error), 'transaction should rollback')
964            with con:
965                cur.execute("insert into %s values (9)" % table)
966            cur.execute("select * from %s order by 1" % table)
967            rows = cur.fetchall()
968            rows = [row[0] for row in rows]
969        finally:
970            con.close()
971        self.assertEqual(rows, [1, 2, 5, 6, 9])
972
973    def test_cursor_connection(self):
974        con = self._connect()
975        cur = con.cursor()
976        self.assertEqual(cur.connection, con)
977        cur.close()
978
979    def test_cursor_as_contextmanager(self):
980        con = self._connect()
981        with con.cursor() as cur:
982            self.assertEqual(cur.connection, con)
983
984    def test_pgdb_type(self):
985        self.assertEqual(pgdb.STRING, pgdb.STRING)
986        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
987        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
988        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
989        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
990        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
991        self.assertEqual('char', pgdb.STRING)
992        self.assertEqual('varchar', pgdb.STRING)
993        self.assertEqual('text', pgdb.STRING)
994        self.assertNotEqual('numeric', pgdb.STRING)
995        self.assertEqual('numeric', pgdb.NUMERIC)
996        self.assertEqual('numeric', pgdb.NUMBER)
997        self.assertEqual('int4', pgdb.NUMBER)
998        self.assertNotEqual('int4', pgdb.NUMERIC)
999        self.assertEqual('int2', pgdb.SMALLINT)
1000        self.assertNotEqual('int4', pgdb.SMALLINT)
1001        self.assertEqual('int2', pgdb.INTEGER)
1002        self.assertEqual('int4', pgdb.INTEGER)
1003        self.assertEqual('int8', pgdb.INTEGER)
1004        self.assertNotEqual('int4', pgdb.LONG)
1005        self.assertEqual('int8', pgdb.LONG)
1006        self.assertTrue('char' in pgdb.STRING)
1007        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
1008        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
1009        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
1010        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
1011        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
1012        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
1013        self.assertEqual('_char', pgdb.ARRAY)
1014        self.assertNotEqual('char', pgdb.ARRAY)
1015        self.assertEqual(pgdb.RECORD, pgdb.RECORD)
1016        self.assertNotEqual(pgdb.RECORD, pgdb.STRING)
1017        self.assertNotEqual(pgdb.RECORD, pgdb.ARRAY)
1018        self.assertEqual('record', pgdb.RECORD)
1019        self.assertNotEqual('_record', pgdb.RECORD)
1020
1021
1022if __name__ == '__main__':
1023    unittest.main()
Note: See TracBrowser for help on using the repository browser.