source: trunk/module/tests/test_dbapi20.py @ 667

Last change on this file since 667 was 667, checked in by cito, 4 years ago

More sophisticated creation of missing database

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 14.2 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3# $Id: test_dbapi20.py 667 2015-12-23 15:53:02Z cito $
4
5try:
6    import unittest2 as unittest  # for Python < 2.7
7except ImportError:
8    import unittest
9
10import pgdb
11
12try:
13    from . import dbapi20
14except (ImportError, ValueError, SystemError):
15    import dbapi20
16
17# We need a database to test against.
18# If LOCAL_PyGreSQL.py exists we will get our information from that.
19# Otherwise we use the defaults.
20dbname = 'dbapi20_test'
21dbhost = ''
22dbport = 5432
23try:
24    from .LOCAL_PyGreSQL import *
25except (ImportError, ValueError):
26    try:
27        from LOCAL_PyGreSQL import *
28    except ImportError:
29        pass
30
31try:
32    long
33except NameError:  # Python >= 3.0
34    long = int
35
36
37class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
38
39    driver = pgdb
40    connect_args = ()
41    connect_kw_args = {'database': dbname,
42        'host': '%s:%d' % (dbhost or '', dbport or -1)}
43
44    lower_func = 'lower'  # For stored procedure test
45
46    def setUp(self):
47        # Call superclass setUp in case this does something in the future
48        dbapi20.DatabaseAPI20Test.setUp(self)
49        try:
50            con = self._connect()
51            con.close()
52        except pgdb.Error:  # try to create a missing database
53            warning.
54            import pg
55            try:  # first try to log in as superuser
56                db = pg.DB('postgres', dbhost or None, dbport or -1,
57                    user='postgres')
58            except Exception:  # then try to log in as current user
59                db = pg.DB('postgres', dbhost or None, dbport or -1)
60            db.query('create database ' + dbname)
61
62    def tearDown(self):
63        dbapi20.DatabaseAPI20Test.tearDown(self)
64
65    def test_row_factory(self):
66        class myCursor(pgdb.pgdbCursor):
67            def row_factory(self, row):
68                d = {}
69                for idx, col in enumerate(self.description):
70                    d[col[0]] = row[idx]
71                return d
72
73        con = self._connect()
74        cur = myCursor(con)
75        ret = cur.execute("select 1 as a, 2 as b")
76        self.assertTrue(ret is cur, 'execute() should return cursor')
77        self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
78
79    def test_cursor_iteration(self):
80        con = self._connect()
81        cur = con.cursor()
82        cur.execute("select 1 union select 2 union select 3")
83        self.assertEqual([r[0] for r in cur], [1, 2, 3])
84
85    def test_fetch_2_rows(self):
86        Decimal = pgdb.decimal_type()
87        values = ['test', pgdb.Binary(b'\xff\x52\xb2'),
88            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
89            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
90            7897234]
91        table = self.table_prefix + 'booze'
92        con = self._connect()
93        try:
94            cur = con.cursor()
95            cur.execute("create table %s ("
96                "stringtest varchar,"
97                "binarytest bytea,"
98                "booltest bool,"
99                "integertest int4,"
100                "longtest int8,"
101                "floattest float8,"
102                "numerictest numeric,"
103                "moneytest money,"
104                "datetest date,"
105                "timetest time,"
106                "datetimetest timestamp,"
107                "intervaltest interval,"
108                "rowidtest oid)" % table)
109            for s in ('numeric', 'monetary', 'time'):
110                cur.execute("set lc_%s to 'C'" % s)
111            for _i in range(2):
112                cur.execute("insert into %s values ("
113                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
114                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
115            cur.execute("select * from %s" % table)
116            rows = cur.fetchall()
117            self.assertEqual(len(rows), 2)
118            row0 = rows[0]
119            self.assertEqual(row0, values)
120            self.assertEqual(row0, rows[1])
121            self.assertIsInstance(row0[0], str)
122            self.assertIsInstance(row0[1], bytes)
123            self.assertIsInstance(row0[2], bool)
124            self.assertIsInstance(row0[3], int)
125            self.assertIsInstance(row0[4], long)
126            self.assertIsInstance(row0[5], float)
127            self.assertIsInstance(row0[6], Decimal)
128            self.assertIsInstance(row0[7], Decimal)
129            self.assertIsInstance(row0[8], str)
130            self.assertIsInstance(row0[9], str)
131            self.assertIsInstance(row0[10], str)
132            self.assertIsInstance(row0[11], str)
133        finally:
134            con.close()
135
136    def test_sqlstate(self):
137        con = self._connect()
138        cur = con.cursor()
139        try:
140            cur.execute("select 1/0")
141        except pgdb.DatabaseError as error:
142            self.assertTrue(isinstance(error, pgdb.ProgrammingError))
143            # the SQLSTATE error code for division by zero is 22012
144            self.assertEqual(error.sqlstate, '22012')
145
146    def test_float(self):
147        nan, inf = float('nan'), float('inf')
148        from math import isnan, isinf
149        self.assertTrue(isnan(nan) and not isinf(nan))
150        self.assertTrue(isinf(inf) and not isnan(inf))
151        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
152        table = self.table_prefix + 'booze'
153        con = self._connect()
154        try:
155            cur = con.cursor()
156            cur.execute(
157                "create table %s (n smallint, floattest float)" % table)
158            params = enumerate(values)
159            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
160            cur.execute("select * from %s order by 1" % table)
161            rows = cur.fetchall()
162        finally:
163            con.close()
164        self.assertEqual(len(rows), len(values))
165        rows = [row[1] for row in rows]
166        for inval, outval in zip(values, rows):
167            if isinf(inval):
168                self.assertTrue(isinf(outval))
169                if inval < 0:
170                    self.assertTrue(outval < 0)
171                else:
172                    self.assertTrue(outval > 0)
173            elif isnan(inval):
174                self.assertTrue(isnan(outval))
175            else:
176                self.assertEqual(inval, outval)
177
178    def test_set_decimal_type(self):
179        decimal_type = pgdb.decimal_type()
180        self.assertTrue(decimal_type is not None and callable(decimal_type))
181        con = self._connect()
182        try:
183            cur = con.cursor()
184            self.assertTrue(pgdb.decimal_type(int) is int)
185            cur.execute('select 42')
186            value = cur.fetchone()[0]
187            self.assertTrue(isinstance(value, int))
188            self.assertEqual(value, 42)
189            self.assertTrue(pgdb.decimal_type(float) is float)
190            cur.execute('select 4.25')
191            value = cur.fetchone()[0]
192            self.assertTrue(isinstance(value, float))
193            self.assertEqual(value, 4.25)
194        finally:
195            con.close()
196            pgdb.decimal_type(decimal_type)
197        self.assertTrue(pgdb.decimal_type() is decimal_type)
198
199    def test_unicode_with_utf8(self):
200        table = self.table_prefix + 'booze'
201        input = u"He wes Leovenaðes sone — liðe him be Drihten"
202        con = self._connect()
203        try:
204            cur = con.cursor()
205            cur.execute("create table %s (t text)" % table)
206            try:
207                cur.execute("set client_encoding=utf8")
208                cur.execute(u"select '%s'" % input)
209            except Exception:
210                self.skipTest("database does not support utf8")
211            output1 = cur.fetchone()[0]
212            cur.execute("insert into %s values (%%s)" % table, (input,))
213            cur.execute("select * from %s" % table)
214            output2 = cur.fetchone()[0]
215            cur.execute("select t = '%s' from %s" % (input, table))
216            output3 = cur.fetchone()[0]
217            cur.execute("select t = %%s from %s" % table, (input,))
218            output4 = cur.fetchone()[0]
219        finally:
220            con.close()
221        if str is bytes:  # Python < 3.0
222            input = input.encode('utf8')
223        self.assertIsInstance(output1, str)
224        self.assertEqual(output1, input)
225        self.assertIsInstance(output2, str)
226        self.assertEqual(output2, input)
227        self.assertIsInstance(output3, bool)
228        self.assertTrue(output3)
229        self.assertIsInstance(output4, bool)
230        self.assertTrue(output4)
231
232    def test_unicode_with_latin1(self):
233        table = self.table_prefix + 'booze'
234        input = u"Ehrt den König seine WÃŒrde, ehret uns der HÀnde Fleiß."
235        con = self._connect()
236        try:
237            cur = con.cursor()
238            cur.execute("create table %s (t text)" % table)
239            try:
240                cur.execute("set client_encoding=latin1")
241                cur.execute(u"select '%s'" % input)
242            except Exception:
243                self.skipTest("database does not support latin1")
244            output1 = cur.fetchone()[0]
245            cur.execute("insert into %s values (%%s)" % table, (input,))
246            cur.execute("select * from %s" % table)
247            output2 = cur.fetchone()[0]
248            cur.execute("select t = '%s' from %s" % (input, table))
249            output3 = cur.fetchone()[0]
250            cur.execute("select t = %%s from %s" % table, (input,))
251            output4 = cur.fetchone()[0]
252        finally:
253            con.close()
254        if str is bytes:  # Python < 3.0
255            input = input.encode('latin1')
256        self.assertIsInstance(output1, str)
257        self.assertEqual(output1, input)
258        self.assertIsInstance(output2, str)
259        self.assertEqual(output2, input)
260        self.assertIsInstance(output3, bool)
261        self.assertTrue(output3)
262        self.assertIsInstance(output4, bool)
263        self.assertTrue(output4)
264
265    def test_bool(self):
266        values = [False, True, None, 't', 'f', 'true', 'false']
267        table = self.table_prefix + 'booze'
268        con = self._connect()
269        try:
270            cur = con.cursor()
271            cur.execute(
272                "create table %s (n smallint, booltest bool)" % table)
273            params = enumerate(values)
274            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
275            cur.execute("select * from %s order by 1" % table)
276            rows = cur.fetchall()
277        finally:
278            con.close()
279        rows = [row[1] for row in rows]
280        values[3] = values[5] = True
281        values[4] = values[6] = False
282        self.assertEqual(rows, values)
283
284    def test_nextset(self):
285        con = self._connect()
286        cur = con.cursor()
287        self.assertRaises(con.NotSupportedError, cur.nextset)
288
289    def test_setoutputsize(self):
290        pass  # not supported
291
292    def test_connection_errors(self):
293        con = self._connect()
294        self.assertEqual(con.Error, pgdb.Error)
295        self.assertEqual(con.Warning, pgdb.Warning)
296        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
297        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
298        self.assertEqual(con.InternalError, pgdb.InternalError)
299        self.assertEqual(con.OperationalError, pgdb.OperationalError)
300        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
301        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
302        self.assertEqual(con.DataError, pgdb.DataError)
303        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
304
305    def test_connection_as_contextmanager(self):
306        table = self.table_prefix + 'booze'
307        con = self._connect()
308        try:
309            cur = con.cursor()
310            cur.execute("create table %s (n smallint check(n!=4))" % table)
311            with con:
312                cur.execute("insert into %s values (1)" % table)
313                cur.execute("insert into %s values (2)" % table)
314            try:
315                with con:
316                    cur.execute("insert into %s values (3)" % table)
317                    cur.execute("insert into %s values (4)" % table)
318            except con.ProgrammingError as error:
319                self.assertTrue('check' in str(error).lower())
320            with con:
321                cur.execute("insert into %s values (5)" % table)
322                cur.execute("insert into %s values (6)" % table)
323            try:
324                with con:
325                    cur.execute("insert into %s values (7)" % table)
326                    cur.execute("insert into %s values (8)" % table)
327                    raise ValueError('transaction should rollback')
328            except ValueError as error:
329                self.assertEqual(str(error), 'transaction should rollback')
330            with con:
331                cur.execute("insert into %s values (9)" % table)
332            cur.execute("select * from %s order by 1" % table)
333            rows = cur.fetchall()
334            rows = [row[0] for row in rows]
335        finally:
336            con.close()
337        self.assertEqual(rows, [1, 2, 5, 6, 9])
338
339    def test_cursor_connection(self):
340        con = self._connect()
341        cur = con.cursor()
342        self.assertEqual(cur.connection, con)
343        cur.close()
344
345    def test_cursor_as_contextmanager(self):
346        con = self._connect()
347        with con.cursor() as cur:
348            self.assertEqual(cur.connection, con)
349
350    def test_pgdb_type(self):
351        self.assertEqual(pgdb.STRING, pgdb.STRING)
352        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
353        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
354        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
355        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
356        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
357        self.assertEqual('char', pgdb.STRING)
358        self.assertEqual('varchar', pgdb.STRING)
359        self.assertEqual('text', pgdb.STRING)
360        self.assertNotEqual('numeric', pgdb.STRING)
361        self.assertEqual('numeric', pgdb.NUMERIC)
362        self.assertEqual('numeric', pgdb.NUMBER)
363        self.assertEqual('int4', pgdb.NUMBER)
364        self.assertNotEqual('int4', pgdb.NUMERIC)
365        self.assertEqual('int2', pgdb.SMALLINT)
366        self.assertNotEqual('int4', pgdb.SMALLINT)
367        self.assertEqual('int2', pgdb.INTEGER)
368        self.assertEqual('int4', pgdb.INTEGER)
369        self.assertEqual('int8', pgdb.INTEGER)
370        self.assertNotEqual('int4', pgdb.LONG)
371        self.assertEqual('int8', pgdb.LONG)
372        self.assertTrue('char' in pgdb.STRING)
373        self.assertTrue(pgdb.NUMERIC <= pgdb.NUMBER)
374        self.assertTrue(pgdb.NUMBER >= pgdb.INTEGER)
375        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
376        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
377
378
379if __name__ == '__main__':
380    unittest.main()
Note: See TracBrowser for help on using the repository browser.