source: branches/4.x/module/tests/test_dbapi20.py @ 667

Last change on this file since 667 was 667, checked in by cito, 4 years ago

More sophisticated creation of missing database

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 11.6 KB
Line 
1#! /usr/bin/python
2# $Id: test_dbapi20.py 667 2015-12-23 15:53:02Z cito $
3
4try:
5    import unittest2 as unittest  # for Python < 2.7
6except ImportError:
7    import unittest
8
9import sys
10
11import pgdb
12
13import dbapi20
14
15# check whether the "with" statement is supported
16no_with = sys.version_info[:2] < (2, 5)
17
18# We need a database to test against.
19# If LOCAL_PyGreSQL.py exists we will get our information from that.
20# Otherwise we use the defaults.
21dbname = 'dbapi20_test'
22dbhost = ''
23dbport = 5432
24try:
25    from LOCAL_PyGreSQL import *
26except ImportError:
27    pass
28
29
30class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
31
32    driver = pgdb
33    connect_args = ()
34    connect_kw_args = {'database': dbname,
35        'host': '%s:%d' % (dbhost or '', dbport or -1)}
36
37    lower_func = 'lower'  # For stored procedure test
38
39    def setUp(self):
40        # Call superclass setUp in case this does something in the future
41        dbapi20.DatabaseAPI20Test.setUp(self)
42        try:
43            con = self._connect()
44            con.close()
45        except pgdb.Error:  # try to create a missing database
46            import pg
47            try:  # first try to log in as superuser
48                db = pg.DB('postgres', dbhost or None, dbport or -1,
49                    user='postgres')
50            except Exception:  # then try to log in as current user
51                db = pg.DB('postgres', dbhost or None, dbport or -1)
52            db.query('create database ' + dbname)
53
54
55    def tearDown(self):
56        dbapi20.DatabaseAPI20Test.tearDown(self)
57
58    def test_row_factory(self):
59        class myCursor(pgdb.pgdbCursor):
60            def row_factory(self, row):
61                d = {}
62                for idx, col in enumerate(self.description):
63                    d[col[0]] = row[idx]
64                return d
65
66        con = self._connect()
67        cur = myCursor(con)
68        ret = cur.execute("select 1 as a, 2 as b")
69        self.assert_(ret is cur, 'execute() should return cursor')
70        self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
71
72    def test_cursor_iteration(self):
73        con = self._connect()
74        cur = con.cursor()
75        cur.execute("select 1 union select 2 union select 3")
76        self.assertEqual([r[0] for r in cur], [1, 2, 3])
77
78    def test_fetch_2_rows(self):
79        Decimal = pgdb.decimal_type()
80        values = ['test', pgdb.Binary('\xff\x52\xb2'),
81            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
82            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
83            7897234]
84        table = self.table_prefix + 'booze'
85        con = self._connect()
86        try:
87            cur = con.cursor()
88            cur.execute("create table %s ("
89                "stringtest varchar,"
90                "binarytest bytea,"
91                "booltest bool,"
92                "integertest int4,"
93                "longtest int8,"
94                "floattest float8,"
95                "numerictest numeric,"
96                "moneytest money,"
97                "datetest date,"
98                "timetest time,"
99                "datetimetest timestamp,"
100                "intervaltest interval,"
101                "rowidtest oid)" % table)
102            for s in ('numeric', 'monetary', 'time'):
103                cur.execute("set lc_%s to 'C'" % s)
104            for _i in range(2):
105                cur.execute("insert into %s values ("
106                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
107                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
108            cur.execute("select * from %s" % table)
109            rows = cur.fetchall()
110            self.assertEqual(len(rows), 2)
111            self.assertEqual(rows[0], values)
112            self.assertEqual(rows[0], rows[1])
113        finally:
114            con.close()
115
116    def test_sqlstate(self):
117        con = self._connect()
118        cur = con.cursor()
119        try:
120            cur.execute("select 1/0")
121        except pgdb.DatabaseError, error:
122            self.assert_(isinstance(error, pgdb.ProgrammingError))
123            # the SQLSTATE error code for division by zero is 22012
124            self.assertEqual(error.sqlstate, '22012')
125
126    def test_float(self):
127        try:
128            nan = float('nan')
129        except ValueError:  # Python < 2.6
130            nan = 3.0e999 - 1.5e999999
131        try:
132            inf = float('inf')
133        except ValueError:  # Python < 2.6
134            inf = 3.0e999 * 1.5e999999
135        try:
136            from math import isnan, isinf
137        except ImportError:  # Python < 2.6
138            isnan = lambda x: x != x
139            isinf = lambda x: not isnan(x) and isnan(x * 0)
140        try:
141            from math import isnan, isinf
142        except ImportError:  # Python < 2.6
143            isnan = lambda x: x != x
144            isinf = lambda x: not isnan(x) and isnan(x * 0)
145        self.assert_(isnan(nan) and not isinf(nan))
146        self.assert_(isinf(inf) and not isnan(inf))
147        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
148        table = self.table_prefix + 'booze'
149        con = self._connect()
150        try:
151            cur = con.cursor()
152            cur.execute(
153                "create table %s (n smallint, floattest float)" % table)
154            params = enumerate(values)
155            cur.executemany("insert into %s values(%%s,%%s)" % table, params)
156            cur.execute("select * from %s order by 1" % table)
157            rows = cur.fetchall()
158        finally:
159            con.close()
160        self.assertEqual(len(rows), len(values))
161        rows = [row[1] for row in rows]
162        for inval, outval in zip(values, rows):
163            if isinf(inval):
164                self.assert_(isinf(outval))
165                if inval < 0:
166                    self.assert_(outval < 0)
167                else:
168                    self.assert_(outval > 0)
169            elif isnan(inval):
170                self.assert_(isnan(outval))
171            else:
172                self.assertEqual(inval, outval)
173
174    def test_bool(self):
175        values = [False, True, None, 't', 'f', 'true', 'false']
176        table = self.table_prefix + 'booze'
177        con = self._connect()
178        try:
179            cur = con.cursor()
180            cur.execute(
181                "create table %s (n smallint, booltest bool)" % table)
182            params = enumerate(values)
183            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
184            cur.execute("select * from %s order by 1" % table)
185            rows = cur.fetchall()
186        finally:
187            con.close()
188        rows = [row[1] for row in rows]
189        values[3] = values[5] = True
190        values[4] = values[6] = False
191        self.assertEqual(rows, values)
192
193    def test_set_decimal_type(self):
194        decimal_type = pgdb.decimal_type()
195        self.assert_(decimal_type is not None and callable(decimal_type))
196        con = self._connect()
197        try:
198            cur = con.cursor()
199            self.assert_(pgdb.decimal_type(int) is int)
200            cur.execute('select 42')
201            value = cur.fetchone()[0]
202            self.assert_(isinstance(value, int))
203            self.assertEqual(value, 42)
204            self.assert_(pgdb.decimal_type(float) is float)
205            cur.execute('select 4.25')
206            value = cur.fetchone()[0]
207            self.assert_(isinstance(value, float))
208            self.assertEqual(value, 4.25)
209        finally:
210            con.close()
211            pgdb.decimal_type(decimal_type)
212        self.assert_(pgdb.decimal_type() is decimal_type)
213
214    def test_nextset(self):
215        con = self._connect()
216        cur = con.cursor()
217        self.assertRaises(con.NotSupportedError, cur.nextset)
218
219    def test_setoutputsize(self):
220        pass  # not supported
221
222    def test_connection_errors(self):
223        con = self._connect()
224        self.assertEqual(con.Error, pgdb.Error)
225        self.assertEqual(con.Warning, pgdb.Warning)
226        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
227        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
228        self.assertEqual(con.InternalError, pgdb.InternalError)
229        self.assertEqual(con.OperationalError, pgdb.OperationalError)
230        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
231        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
232        self.assertEqual(con.DataError, pgdb.DataError)
233        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
234
235    @unittest.skipIf(no_with, 'context managers not supported')
236    def test_connection_as_contextmanager(self):
237        table = self.table_prefix + 'booze'
238        con = self._connect()
239        # wrap "with" statements to avoid SyntaxError in Python < 2.5
240        exec """from __future__ import with_statement\nif True:
241        try:
242            cur = con.cursor()
243            cur.execute("create table %s (n smallint check(n!=4))" % table)
244            with con:
245                cur.execute("insert into %s values (1)" % table)
246                cur.execute("insert into %s values (2)" % table)
247            try:
248                with con:
249                    cur.execute("insert into %s values (3)" % table)
250                    cur.execute("insert into %s values (4)" % table)
251            except con.ProgrammingError, error:
252                self.assertTrue('check' in str(error).lower())
253            with con:
254                cur.execute("insert into %s values (5)" % table)
255                cur.execute("insert into %s values (6)" % table)
256            try:
257                with con:
258                    cur.execute("insert into %s values (7)" % table)
259                    cur.execute("insert into %s values (8)" % table)
260                    raise ValueError('transaction should rollback')
261            except ValueError, error:
262                self.assertEqual(str(error), 'transaction should rollback')
263            with con:
264                cur.execute("insert into %s values (9)" % table)
265            cur.execute("select * from %s order by 1" % table)
266            rows = cur.fetchall()
267            rows = [row[0] for row in rows]
268        finally:
269            con.close()\n"""
270        self.assertEqual(rows, [1, 2, 5, 6, 9])
271
272    def test_cursor_connection(self):
273        con = self._connect()
274        cur = con.cursor()
275        self.assertEqual(cur.connection, con)
276        cur.close()
277
278    @unittest.skipIf(no_with, 'context managers not supported')
279    def test_cursor_as_contextmanager(self):
280        con = self._connect()
281        # wrap "with" statements to avoid SyntaxError in Python < 2.5
282        exec """from __future__ import with_statement\nif True:
283        with con.cursor() as cur:
284            self.assertEqual(cur.connection, con)\n"""
285
286    def test_pgdb_type(self):
287        self.assertEqual(pgdb.STRING, pgdb.STRING)
288        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
289        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
290        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
291        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
292        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
293        self.assertEqual('char', pgdb.STRING)
294        self.assertEqual('varchar', pgdb.STRING)
295        self.assertEqual('text', pgdb.STRING)
296        self.assertNotEqual('numeric', pgdb.STRING)
297        self.assertEqual('numeric', pgdb.NUMERIC)
298        self.assertEqual('numeric', pgdb.NUMBER)
299        self.assertEqual('int4', pgdb.NUMBER)
300        self.assertNotEqual('int4', pgdb.NUMERIC)
301        self.assertEqual('int2', pgdb.SMALLINT)
302        self.assertNotEqual('int4', pgdb.SMALLINT)
303        self.assertEqual('int2', pgdb.INTEGER)
304        self.assertEqual('int4', pgdb.INTEGER)
305        self.assertEqual('int8', pgdb.INTEGER)
306        self.assertNotEqual('int4', pgdb.LONG)
307        self.assertEqual('int8', pgdb.LONG)
308        self.assert_('char' in pgdb.STRING)
309        self.assert_(pgdb.NUMERIC <= pgdb.NUMBER)
310        self.assert_(pgdb.NUMBER >= pgdb.INTEGER)
311        self.assert_(pgdb.TIME <= pgdb.DATETIME)
312        self.assert_(pgdb.DATETIME >= pgdb.DATE)
313
314
315if __name__ == '__main__':
316    unittest.main()
Note: See TracBrowser for help on using the repository browser.