source: trunk/tests/test_dbapi20.py @ 774

Last change on this file since 774 was 774, checked in by cito, 4 years ago

Add support for JSON and JSONB to pg and pgdb

This adds all necessary functions to make PyGreSQL automatically
convert between JSON columns and Python objects representing them.

The documentation has also been updated, see there for the details.

Also, tuples automatically bind to ROW expressions in pgdb now.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 28.7 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 774 2016-01-21 18:49:28Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31from datetime import datetime
32
33try:
34    long
35except NameError:  # Python >= 3.0
36    long = int
37
38try:
39    from collections import OrderedDict
40except ImportError:  # Python 2.6 or 3.0
41    OrderedDict = None
42
43
44class PgBitString:
45    """Test object with a PostgreSQL representation as Bit String."""
46
47    def __init__(self, value):
48        self.value = value
49
50    def __pg_repr__(self):
51         return "B'{0:b}'".format(self.value)
52
53
54class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
55
56    driver = pgdb
57    connect_args = ()
58    connect_kw_args = {'database': dbname,
59        'host': '%s:%d' % (dbhost or '', dbport or -1)}
60
61    lower_func = 'lower'  # For stored procedure test
62
63    def setUp(self):
64        # Call superclass setUp in case this does something in the future
65        dbapi20.DatabaseAPI20Test.setUp(self)
66        try:
67            con = self._connect()
68            con.close()
69        except pgdb.Error:  # try to create a missing database
70            import pg
71            try:  # first try to log in as superuser
72                db = pg.DB('postgres', dbhost or None, dbport or -1,
73                    user='postgres')
74            except Exception:  # then try to log in as current user
75                db = pg.DB('postgres', dbhost or None, dbport or -1)
76            db.query('create database ' + dbname)
77
78    def tearDown(self):
79        dbapi20.DatabaseAPI20Test.tearDown(self)
80
81    def test_callproc_no_params(self):
82        con = self._connect()
83        cur = con.cursor()
84        # note that now() does not change within a transaction
85        cur.execute('select now()')
86        now = cur.fetchone()[0]
87        res = cur.callproc('now')
88        self.assertIsNone(res)
89        res = cur.fetchone()[0]
90        self.assertEqual(res, now)
91
92    def test_callproc_bad_params(self):
93        con = self._connect()
94        cur = con.cursor()
95        self.assertRaises(TypeError, cur.callproc, 'lower', 42)
96        self.assertRaises(pgdb.ProgrammingError, cur.callproc, 'lower', (42,))
97
98    def test_callproc_one_param(self):
99        con = self._connect()
100        cur = con.cursor()
101        params = (42.4382,)
102        res = cur.callproc("round", params)
103        self.assertIs(res, params)
104        res = cur.fetchone()[0]
105        self.assertEqual(res, 42)
106
107    def test_callproc_two_params(self):
108        con = self._connect()
109        cur = con.cursor()
110        params = (9, 4)
111        res = cur.callproc("div", params)
112        self.assertIs(res, params)
113        res = cur.fetchone()[0]
114        self.assertEqual(res, 2)
115
116    def test_cursor_type(self):
117
118        class TestCursor(pgdb.Cursor):
119            pass
120
121        con = self._connect()
122        self.assertIs(con.cursor_type, pgdb.Cursor)
123        cur = con.cursor()
124        self.assertIsInstance(cur, pgdb.Cursor)
125        self.assertNotIsInstance(cur, TestCursor)
126        con.cursor_type = TestCursor
127        cur = con.cursor()
128        self.assertIsInstance(cur, TestCursor)
129        cur = con.cursor()
130        self.assertIsInstance(cur, TestCursor)
131        con = self._connect()
132        self.assertIs(con.cursor_type, pgdb.Cursor)
133        cur = con.cursor()
134        self.assertIsInstance(cur, pgdb.Cursor)
135        self.assertNotIsInstance(cur, TestCursor)
136
137    def test_row_factory(self):
138
139        class TestCursor(pgdb.Cursor):
140
141            def row_factory(self, row):
142                return dict(('column %s' % desc[0], value)
143                    for desc, value in zip(self.description, row))
144
145        con = self._connect()
146        con.cursor_type = TestCursor
147        cur = con.cursor()
148        self.assertIsInstance(cur, TestCursor)
149        res = cur.execute("select 1 as a, 2 as b")
150        self.assertIs(res, cur, 'execute() should return cursor')
151        res = cur.fetchone()
152        self.assertIsInstance(res, dict)
153        self.assertEqual(res, {'column a': 1, 'column b': 2})
154        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
155        res = cur.fetchall()
156        self.assertIsInstance(res, list)
157        self.assertEqual(len(res), 2)
158        self.assertIsInstance(res[0], dict)
159        self.assertEqual(res[0], {'column a': 1, 'column b': 2})
160        self.assertIsInstance(res[1], dict)
161        self.assertEqual(res[1], {'column a': 3, 'column b': 4})
162
163    def test_build_row_factory(self):
164
165        class TestCursor(pgdb.Cursor):
166
167            def build_row_factory(self):
168                keys = [desc[0] for desc in self.description]
169                return lambda row: dict((key, value)
170                    for key, value in zip(keys, row))
171
172        con = self._connect()
173        con.cursor_type = TestCursor
174        cur = con.cursor()
175        self.assertIsInstance(cur, TestCursor)
176        cur.execute("select 1 as a, 2 as b")
177        res = cur.fetchone()
178        self.assertIsInstance(res, dict)
179        self.assertEqual(res, {'a': 1, 'b': 2})
180        cur.execute("select 1 as a, 2 as b union select 3, 4 order by 1")
181        res = cur.fetchall()
182        self.assertIsInstance(res, list)
183        self.assertEqual(len(res), 2)
184        self.assertIsInstance(res[0], dict)
185        self.assertEqual(res[0], {'a': 1, 'b': 2})
186        self.assertIsInstance(res[1], dict)
187        self.assertEqual(res[1], {'a': 3, 'b': 4})
188
189    def test_cursor_with_named_columns(self):
190        con = self._connect()
191        cur = con.cursor()
192        res = cur.execute("select 1 as abc, 2 as de, 3 as f")
193        self.assertIs(res, cur, 'execute() should return cursor')
194        res = cur.fetchone()
195        self.assertIsInstance(res, tuple)
196        self.assertEqual(res, (1, 2, 3))
197        self.assertEqual(res._fields, ('abc', 'de', 'f'))
198        self.assertEqual(res.abc, 1)
199        self.assertEqual(res.de, 2)
200        self.assertEqual(res.f, 3)
201        cur.execute("select 1 as one, 2 as two union select 3, 4 order by 1")
202        res = cur.fetchall()
203        self.assertIsInstance(res, list)
204        self.assertEqual(len(res), 2)
205        self.assertIsInstance(res[0], tuple)
206        self.assertEqual(res[0], (1, 2))
207        self.assertEqual(res[0]._fields, ('one', 'two'))
208        self.assertIsInstance(res[1], tuple)
209        self.assertEqual(res[1], (3, 4))
210        self.assertEqual(res[1]._fields, ('one', 'two'))
211
212    def test_cursor_with_unnamed_columns(self):
213        con = self._connect()
214        cur = con.cursor()
215        cur.execute("select 1, 2, 3")
216        res = cur.fetchone()
217        self.assertIsInstance(res, tuple)
218        self.assertEqual(res, (1, 2, 3))
219        old_py = OrderedDict is None  # Python 2.6 or 3.0
220        # old Python versions cannot rename tuple fields with underscore
221        if old_py:
222            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
223        else:
224            self.assertEqual(res._fields, ('_0', '_1', '_2'))
225        cur.execute("select 1 as one, 2, 3 as three")
226        res = cur.fetchone()
227        self.assertIsInstance(res, tuple)
228        self.assertEqual(res, (1, 2, 3))
229        if old_py:  # cannot auto rename with underscore
230            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
231        else:
232            self.assertEqual(res._fields, ('one', '_1', 'three'))
233        cur.execute("select 1 as abc, 2 as def")
234        res = cur.fetchone()
235        self.assertIsInstance(res, tuple)
236        self.assertEqual(res, (1, 2))
237        if old_py:
238            self.assertEqual(res._fields, ('column_0', 'column_1'))
239        else:
240            self.assertEqual(res._fields, ('abc', '_1'))
241
242    def test_colnames(self):
243        con = self._connect()
244        cur = con.cursor()
245        cur.execute("select 1, 2, 3")
246        names = cur.colnames
247        self.assertIsInstance(names, list)
248        self.assertEqual(names, ['?column?', '?column?', '?column?'])
249        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
250        names = cur.colnames
251        self.assertIsInstance(names, list)
252        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
253
254    def test_coltypes(self):
255        con = self._connect()
256        cur = con.cursor()
257        cur.execute("select 1::int2, 2::int4, 3::int8")
258        types = cur.coltypes
259        self.assertIsInstance(types, list)
260        self.assertEqual(types, ['int2', 'int4', 'int8'])
261
262    def test_description_fields(self):
263        con = self._connect()
264        cur = con.cursor()
265        cur.execute("select 123456789::int8 as col")
266        desc = cur.description
267        self.assertIsInstance(desc, list)
268        self.assertEqual(len(desc), 1)
269        desc = desc[0]
270        self.assertIsInstance(desc, tuple)
271        self.assertEqual(len(desc), 7)
272        self.assertEqual(desc.name, 'col')
273        self.assertEqual(desc.type_code, 'int8')
274        self.assertIsNone(desc.display_size)
275        self.assertIsInstance(desc.internal_size, int)
276        self.assertEqual(desc.internal_size, 8)
277        self.assertIsNone(desc.precision)
278        self.assertIsNone(desc.scale)
279        self.assertIsNone(desc.null_ok)
280
281    def test_cursor_iteration(self):
282        con = self._connect()
283        cur = con.cursor()
284        cur.execute("select 1 union select 2 union select 3")
285        self.assertEqual([r[0] for r in cur], [1, 2, 3])
286
287    def test_fetch_2_rows(self):
288        Decimal = pgdb.decimal_type()
289        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
290            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
291            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
292            7897234)
293        table = self.table_prefix + 'booze'
294        con = self._connect()
295        try:
296            cur = con.cursor()
297            cur.execute("set datestyle to 'iso'")
298            cur.execute("create table %s ("
299                "stringtest varchar,"
300                "binarytest bytea,"
301                "booltest bool,"
302                "integertest int4,"
303                "longtest int8,"
304                "floattest float8,"
305                "numerictest numeric,"
306                "moneytest money,"
307                "datetest date,"
308                "timetest time,"
309                "datetimetest timestamp,"
310                "intervaltest interval,"
311                "rowidtest oid)" % table)
312            cur.execute("set standard_conforming_strings to on")
313            for s in ('numeric', 'monetary', 'time'):
314                cur.execute("set lc_%s to 'C'" % s)
315            for _i in range(2):
316                cur.execute("insert into %s values ("
317                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
318                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
319            cur.execute("select * from %s" % table)
320            rows = cur.fetchall()
321            self.assertEqual(len(rows), 2)
322            row0 = rows[0]
323            self.assertEqual(row0, values)
324            self.assertEqual(row0, rows[1])
325            self.assertIsInstance(row0[0], str)
326            self.assertIsInstance(row0[1], bytes)
327            self.assertIsInstance(row0[2], bool)
328            self.assertIsInstance(row0[3], int)
329            self.assertIsInstance(row0[4], long)
330            self.assertIsInstance(row0[5], float)
331            self.assertIsInstance(row0[6], Decimal)
332            self.assertIsInstance(row0[7], Decimal)
333            self.assertIsInstance(row0[8], str)
334            self.assertIsInstance(row0[9], str)
335            self.assertIsInstance(row0[10], str)
336            self.assertIsInstance(row0[11], str)
337        finally:
338            con.close()
339
340    def test_sqlstate(self):
341        con = self._connect()
342        cur = con.cursor()
343        try:
344            cur.execute("select 1/0")
345        except pgdb.DatabaseError as error:
346            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
347            # the SQLSTATE error code for division by zero is 22012
348            self.assertEqual(error.sqlstate, '22012')
349
350    def test_float(self):
351        nan, inf = float('nan'), float('inf')
352        from math import isnan, isinf
353        self.assertTrue(isnan(nan) and not isinf(nan))
354        self.assertTrue(isinf(inf) and not isnan(inf))
355        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf,
356            'nan', 'inf', '-inf', 'NaN', 'Infinity', '-Infinity']
357        table = self.table_prefix + 'booze'
358        con = self._connect()
359        try:
360            cur = con.cursor()
361            cur.execute(
362                "create table %s (n smallint, floattest float)" % table)
363            params = enumerate(values)
364            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
365            cur.execute("select * from %s order by 1" % table)
366            rows = cur.fetchall()
367            self.assertEqual(cur.description[1].type_code, pgdb.FLOAT)
368        finally:
369            con.close()
370        self.assertEqual(len(rows), len(values))
371        rows = [row[1] for row in rows]
372        for inval, outval in zip(values, rows):
373            if inval in ('inf', 'Infinity'):
374                inval = inf
375            elif inval in ('-inf', '-Infinity'):
376                inval = -inf
377            elif inval in ('nan', 'NaN'):
378                inval = nan
379            if isinf(inval):
380                self.assertTrue(isinf(outval))
381                if inval < 0:
382                    self.assertTrue(outval < 0)
383                else:
384                    self.assertTrue(outval > 0)
385            elif isnan(inval):
386                self.assertTrue(isnan(outval))
387            else:
388                self.assertEqual(inval, outval)
389
390    def test_datetime(self):
391        values = ['2011-07-17 15:47:42', datetime(2016, 1, 20, 20, 15, 51)]
392        table = self.table_prefix + 'booze'
393        con = self._connect()
394        try:
395            cur = con.cursor()
396            cur.execute("set datestyle to 'iso'")
397            cur.execute(
398                "create table %s (n smallint, ts timestamp)" % table)
399            params = enumerate(values)
400            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
401            cur.execute("select * from %s order by 1" % table)
402            rows = cur.fetchall()
403            self.assertEqual(cur.description[1].type_code, pgdb.DATETIME)
404        finally:
405            con.close()
406        self.assertEqual(len(rows), len(values))
407        rows = [row[1] for row in rows]
408        for inval, outval in zip(values, rows):
409            if isinstance(inval, datetime):
410                inval = inval.strftime('%Y-%m-%d %H:%M:%S')
411            self.assertEqual(inval, outval)
412
413    def test_list_binds_as_array(self):
414        values = ([20000, 25000, 25000, 30000],
415            [['breakfast', 'consulting'], ['meeting', 'lunch']])
416        output = ('{20000,25000,25000,30000}',
417            '{{breakfast,consulting},{meeting,lunch}}')
418        table = self.table_prefix + 'booze'
419        con = self._connect()
420        try:
421            cur = con.cursor()
422            cur.execute("create table %s (i int[], t text[][])" % table)
423            cur.execute("insert into %s values (%%s,%%s)" % table, values)
424            cur.execute("select * from %s" % table)
425            row = cur.fetchone()
426        finally:
427            con.close()
428        self.assertEqual(row, output)
429
430    def test_tuple_binds_as_row(self):
431        values = (1, 2.5, 'this is a test')
432        output = '(1,2.5,"this is a test")'
433        con = self._connect()
434        try:
435            cur = con.cursor()
436            cur.execute("select %s", [values])
437            outval = cur.fetchone()[0]
438        finally:
439            con.close()
440        self.assertEqual(outval, output)
441
442    def test_custom_type(self):
443        values = [3, 5, 65]
444        values = list(map(PgBitString, values))
445        table = self.table_prefix + 'booze'
446        con = self._connect()
447        try:
448            cur = con.cursor()
449            params = enumerate(values)  # params have __pg_repr__ method
450            cur.execute(
451                'create table "%s" (n smallint, b bit varying(7))' % table)
452            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
453            cur.execute("select * from %s order by 1" % table)
454            rows = cur.fetchall()
455        finally:
456            con.close()
457        self.assertEqual(len(rows), len(values))
458        con = self._connect()
459        try:
460            cur = con.cursor()
461            params = (1, object())  # an object that cannot be handled
462            self.assertRaises(pgdb.InterfaceError, cur.execute,
463                "insert into %s values (%%s,%%s)" % table, params)
464        finally:
465            con.close()
466
467    def test_set_decimal_type(self):
468        decimal_type = pgdb.decimal_type()
469        self.assertTrue(decimal_type is not None and callable(decimal_type))
470        con = self._connect()
471        try:
472            cur = con.cursor()
473            self.assertTrue(pgdb.decimal_type(int) is int)
474            cur.execute('select 42')
475            self.assertEqual(cur.description[0].type_code, pgdb.INTEGER)
476            value = cur.fetchone()[0]
477            self.assertTrue(isinstance(value, int))
478            self.assertEqual(value, 42)
479            self.assertTrue(pgdb.decimal_type(float) is float)
480            cur.execute('select 4.25')
481            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
482            value = cur.fetchone()[0]
483            self.assertTrue(isinstance(value, float))
484            self.assertEqual(value, 4.25)
485        finally:
486            con.close()
487            pgdb.decimal_type(decimal_type)
488        self.assertTrue(pgdb.decimal_type() is decimal_type)
489
490    def test_unicode_with_utf8(self):
491        table = self.table_prefix + 'booze'
492        input = u"He wes Leovenaðes sone — liðe him be Drihten"
493        con = self._connect()
494        try:
495            cur = con.cursor()
496            cur.execute("create table %s (t text)" % table)
497            try:
498                cur.execute("set client_encoding=utf8")
499                cur.execute(u"select '%s'" % input)
500            except Exception:
501                self.skipTest("database does not support utf8")
502            output1 = cur.fetchone()[0]
503            cur.execute("insert into %s values (%%s)" % table, (input,))
504            cur.execute("select * from %s" % table)
505            output2 = cur.fetchone()[0]
506            cur.execute("select t = '%s' from %s" % (input, table))
507            output3 = cur.fetchone()[0]
508            cur.execute("select t = %%s from %s" % table, (input,))
509            output4 = cur.fetchone()[0]
510        finally:
511            con.close()
512        if str is bytes:  # Python < 3.0
513            input = input.encode('utf8')
514        self.assertIsInstance(output1, str)
515        self.assertEqual(output1, input)
516        self.assertIsInstance(output2, str)
517        self.assertEqual(output2, input)
518        self.assertIsInstance(output3, bool)
519        self.assertTrue(output3)
520        self.assertIsInstance(output4, bool)
521        self.assertTrue(output4)
522
523    def test_unicode_with_latin1(self):
524        table = self.table_prefix + 'booze'
525        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
526        con = self._connect()
527        try:
528            cur = con.cursor()
529            cur.execute("create table %s (t text)" % table)
530            try:
531                cur.execute("set client_encoding=latin1")
532                cur.execute(u"select '%s'" % input)
533            except Exception:
534                self.skipTest("database does not support latin1")
535            output1 = cur.fetchone()[0]
536            cur.execute("insert into %s values (%%s)" % table, (input,))
537            cur.execute("select * from %s" % table)
538            output2 = cur.fetchone()[0]
539            cur.execute("select t = '%s' from %s" % (input, table))
540            output3 = cur.fetchone()[0]
541            cur.execute("select t = %%s from %s" % table, (input,))
542            output4 = cur.fetchone()[0]
543        finally:
544            con.close()
545        if str is bytes:  # Python < 3.0
546            input = input.encode('latin1')
547        self.assertIsInstance(output1, str)
548        self.assertEqual(output1, input)
549        self.assertIsInstance(output2, str)
550        self.assertEqual(output2, input)
551        self.assertIsInstance(output3, bool)
552        self.assertTrue(output3)
553        self.assertIsInstance(output4, bool)
554        self.assertTrue(output4)
555
556    def test_bool(self):
557        values = [False, True, None, 't', 'f', 'true', 'false']
558        table = self.table_prefix + 'booze'
559        con = self._connect()
560        try:
561            cur = con.cursor()
562            cur.execute(
563                "create table %s (n smallint, booltest bool)" % table)
564            params = enumerate(values)
565            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
566            cur.execute("select * from %s order by 1" % table)
567            rows = cur.fetchall()
568            self.assertEqual(cur.description[1].type_code, pgdb.BOOL)
569        finally:
570            con.close()
571        rows = [row[1] for row in rows]
572        values[3] = values[5] = True
573        values[4] = values[6] = False
574        self.assertEqual(rows, values)
575
576    def test_json(self):
577        inval = {"employees":
578            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
579        table = self.table_prefix + 'booze'
580        con = self._connect()
581        try:
582            cur = con.cursor()
583            try:
584                cur.execute("create table %s (jsontest json)" % table)
585            except pgdb.ProgrammingError:
586                self.skipTest('database does not support json')
587            params = (pgdb.Json(inval),)
588            cur.execute("insert into %s values (%%s)" % table, params)
589            cur.execute("select * from %s" % table)
590            outval = cur.fetchone()[0]
591            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
592        finally:
593            con.close()
594        self.assertEqual(inval, outval)
595
596    def test_jsonb(self):
597        inval = {"employees":
598            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
599        table = self.table_prefix + 'booze'
600        con = self._connect()
601        try:
602            cur = con.cursor()
603            try:
604                cur.execute("create table %s (jsonbtest jsonb)" % table)
605            except pgdb.ProgrammingError:
606                self.skipTest('database does not support jsonb')
607            params = (pgdb.Json(inval),)
608            cur.execute("insert into %s values (%%s)" % table, params)
609            cur.execute("select * from %s" % table)
610            outval = cur.fetchone()[0]
611            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
612        finally:
613            con.close()
614        self.assertEqual(inval, outval)
615
616    def test_execute_edge_cases(self):
617        con = self._connect()
618        try:
619            cur = con.cursor()
620            sql = 'invalid'  # should be ignored with empty parameter list
621            cur.executemany(sql, [])
622            sql = 'select %d + 1'
623            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
624            self.assertEqual(cur.fetchone()[0], 3)
625            sql = 'select 1/0'  # cannot be executed
626            self.assertRaises(pgdb.ProgrammingError, cur.execute, sql)
627            cur.close()
628            con.rollback()
629            if pgdb.shortcutmethods:
630                res = con.execute('select %d', (1,)).fetchone()
631                self.assertEqual(res, (1,))
632                res = con.executemany('select %d', [(1,), (2,)]).fetchone()
633                self.assertEqual(res, (2,))
634        finally:
635            con.close()
636        sql = 'select 1'  # cannot be executed after connection is closed
637        self.assertRaises(pgdb.OperationalError, cur.execute, sql)
638
639    def test_fetchmany_with_keep(self):
640        con = self._connect()
641        try:
642            cur = con.cursor()
643            self.assertEqual(cur.arraysize, 1)
644            cur.execute('select * from generate_series(1, 25)')
645            self.assertEqual(len(cur.fetchmany()), 1)
646            self.assertEqual(len(cur.fetchmany()), 1)
647            self.assertEqual(cur.arraysize, 1)
648            cur.arraysize = 3
649            self.assertEqual(len(cur.fetchmany()), 3)
650            self.assertEqual(len(cur.fetchmany()), 3)
651            self.assertEqual(cur.arraysize, 3)
652            self.assertEqual(len(cur.fetchmany(size=2)), 2)
653            self.assertEqual(cur.arraysize, 3)
654            self.assertEqual(len(cur.fetchmany()), 3)
655            self.assertEqual(len(cur.fetchmany()), 3)
656            self.assertEqual(len(cur.fetchmany(size=2, keep=True)), 2)
657            self.assertEqual(cur.arraysize, 2)
658            self.assertEqual(len(cur.fetchmany()), 2)
659            self.assertEqual(len(cur.fetchmany()), 2)
660            self.assertEqual(len(cur.fetchmany(25)), 3)
661        finally:
662            con.close()
663
664    def test_nextset(self):
665        con = self._connect()
666        cur = con.cursor()
667        self.assertRaises(con.NotSupportedError, cur.nextset)
668
669    def test_setoutputsize(self):
670        pass  # not supported
671
672    def test_connection_errors(self):
673        con = self._connect()
674        self.assertEqual(con.Error, pgdb.Error)
675        self.assertEqual(con.Warning, pgdb.Warning)
676        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
677        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
678        self.assertEqual(con.InternalError, pgdb.InternalError)
679        self.assertEqual(con.OperationalError, pgdb.OperationalError)
680        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
681        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
682        self.assertEqual(con.DataError, pgdb.DataError)
683        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
684
685    def test_connection_as_contextmanager(self):
686        table = self.table_prefix + 'booze'
687        con = self._connect()
688        try:
689            cur = con.cursor()
690            cur.execute("create table %s (n smallint check(n!=4))" % table)
691            with con:
692                cur.execute("insert into %s values (1)" % table)
693                cur.execute("insert into %s values (2)" % table)
694            try:
695                with con:
696                    cur.execute("insert into %s values (3)" % table)
697                    cur.execute("insert into %s values (4)" % table)
698            except con.ProgrammingError as error:
699                self.assertTrue('check' in str(error).lower())
700            with con:
701                cur.execute("insert into %s values (5)" % table)
702                cur.execute("insert into %s values (6)" % table)
703            try:
704                with con:
705                    cur.execute("insert into %s values (7)" % table)
706                    cur.execute("insert into %s values (8)" % table)
707                    raise ValueError('transaction should rollback')
708            except ValueError as error:
709                self.assertEqual(str(error), 'transaction should rollback')
710            with con:
711                cur.execute("insert into %s values (9)" % table)
712            cur.execute("select * from %s order by 1" % table)
713            rows = cur.fetchall()
714            rows = [row[0] for row in rows]
715        finally:
716            con.close()
717        self.assertEqual(rows, [1, 2, 5, 6, 9])
718
719    def test_cursor_connection(self):
720        con = self._connect()
721        cur = con.cursor()
722        self.assertEqual(cur.connection, con)
723        cur.close()
724
725    def test_cursor_as_contextmanager(self):
726        con = self._connect()
727        with con.cursor() as cur:
728            self.assertEqual(cur.connection, con)
729
730    def test_pgdb_type(self):
731        self.assertEqual(pgdb.STRING, pgdb.STRING)
732        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
733        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
734        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
735        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
736        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
737        self.assertEqual('char', pgdb.STRING)
738        self.assertEqual('varchar', pgdb.STRING)
739        self.assertEqual('text', pgdb.STRING)
740        self.assertNotEqual('numeric', pgdb.STRING)
741        self.assertEqual('numeric', pgdb.NUMERIC)
742        self.assertEqual('numeric', pgdb.NUMBER)
743        self.assertEqual('int4', pgdb.NUMBER)
744        self.assertNotEqual('int4', pgdb.NUMERIC)
745        self.assertEqual('int2', pgdb.SMALLINT)
746        self.assertNotEqual('int4', pgdb.SMALLINT)
747        self.assertEqual('int2', pgdb.INTEGER)
748        self.assertEqual('int4', pgdb.INTEGER)
749        self.assertEqual('int8', pgdb.INTEGER)
750        self.assertNotEqual('int4', pgdb.LONG)
751        self.assertEqual('int8', pgdb.LONG)
752        self.assertTrue('char' in pgdb.STRING)
753        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
754        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
755        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
756        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
757
758
759if __name__ == '__main__':
760    unittest.main()
Note: See TracBrowser for help on using the repository browser.