source: trunk/module/TEST_PyGreSQL_dbapi20.py @ 525

Last change on this file since 525 was 525, checked in by darcy, 4 years ago

Current excepton handling so that it can run under Python 3.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 10.0 KB
Line 
1#! /usr/bin/python
2# $Id: TEST_PyGreSQL_dbapi20.py 525 2015-11-08 18:01:14Z darcy $
3
4from __future__ import with_statement
5
6import unittest
7import dbapi20
8import pgdb
9
10# We need a database to test against.
11# If LOCAL_PyGreSQL.py exists we will get our information from that.
12# Otherwise we use the defaults.
13dbname = 'dbapi20_test'
14dbhost = ''
15dbport = 5432
16try:
17    from LOCAL_PyGreSQL import *
18except ImportError:
19    pass
20
21
22class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
23
24    driver = pgdb
25    connect_args = ()
26    connect_kw_args = {'dsn': dbhost + ':' + dbname}
27
28    lower_func = 'lower'  # For stored procedure test
29
30    def setUp(self):
31        # Call superclass setUp in case this does something in the future
32        dbapi20.DatabaseAPI20Test.setUp(self)
33        try:
34            con = self._connect()
35            con.close()
36        except pgdb.Error:
37            import pg
38            pg.DB().query('create database ' + dbname)
39
40    def tearDown(self):
41        dbapi20.DatabaseAPI20Test.tearDown(self)
42
43    def test_row_factory(self):
44        class myCursor(pgdb.pgdbCursor):
45            def row_factory(self, row):
46                d = {}
47                for idx, col in enumerate(self.description):
48                    d[col[0]] = row[idx]
49                return d
50
51        con = self._connect()
52        cur = myCursor(con)
53        ret = cur.execute("select 1 as a, 2 as b")
54        self.assert_(ret is cur, 'execute() should return cursor')
55        self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
56
57    def test_cursor_iteration(self):
58        con = self._connect()
59        cur = con.cursor()
60        cur.execute("select 1 union select 2 union select 3")
61        self.assertEqual([r[0] for r in cur], [1, 2, 3])
62
63    def test_fetch_2_rows(self):
64        Decimal = pgdb.decimal_type()
65        values = ['test', pgdb.Binary('\xff\x52\xb2'),
66            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
67            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
68            7897234]
69        table = self.table_prefix + 'booze'
70        con = self._connect()
71        try:
72            cur = con.cursor()
73            cur.execute("create table %s ("
74                "stringtest varchar,"
75                "binarytest bytea,"
76                "booltest bool,"
77                "integertest int4,"
78                "longtest int8,"
79                "floattest float8,"
80                "numerictest numeric,"
81                "moneytest money,"
82                "datetest date,"
83                "timetest time,"
84                "datetimetest timestamp,"
85                "intervaltest interval,"
86                "rowidtest oid)" % table)
87            for s in ('numeric', 'monetary', 'time'):
88                cur.execute("set lc_%s to 'C'" % s)
89            for _i in range(2):
90                cur.execute("insert into %s values ("
91                    "%%s,%%s,%%s,%%s,%%s,%%s,%%s,"
92                    "'%%s'::money,%%s,%%s,%%s,%%s,%%s)" % table, values)
93            cur.execute("select * from %s" % table)
94            rows = cur.fetchall()
95            self.assertEqual(len(rows), 2)
96            self.assertEqual(rows[0], values)
97            self.assertEqual(rows[0], rows[1])
98        finally:
99            con.close()
100
101    def test_sqlstate(self):
102        con = self._connect()
103        cur = con.cursor()
104        try:
105            cur.execute("select 1/0")
106        except pgdb.DatabaseError as error:
107            self.assert_(isinstance(error, pgdb.ProgrammingError))
108            # the SQLSTATE error code for division by zero is 22012
109            self.assertEqual(error.sqlstate, '22012')
110
111    def test_float(self):
112        try:
113            nan = float('nan')
114        except ValueError:  # Python < 2.6
115            nan = 3.0e999 - 1.5e999999
116        try:
117            inf = float('inf')
118        except ValueError:  # Python < 2.6
119            inf = 3.0e999 * 1.5e999999
120        try:
121            from math import isnan, isinf
122        except ImportError:  # Python < 2.6
123            isnan = lambda x: x != x
124            isinf = lambda x: not isnan(x) and isnan(x * 0)
125        try:
126            from math import isnan, isinf
127        except ImportError:  # Python < 2.6
128            isnan = lambda x: x != x
129            isinf = lambda x: not isnan(x) and isnan(x * 0)
130        self.assert_(isnan(nan) and not isinf(nan))
131        self.assert_(isinf(inf) and not isnan(inf))
132        values = [0, 1, 0.03125, -42.53125, nan, inf, -inf]
133        table = self.table_prefix + 'booze'
134        con = self._connect()
135        try:
136            cur = con.cursor()
137            cur.execute(
138                "create table %s (n smallint, floattest float)" % table)
139            params = enumerate(values)
140            cur.executemany("insert into %s values(%%s,%%s)" % table, params)
141            cur.execute("select * from %s order by 1" % table)
142            rows = cur.fetchall()
143        finally:
144            con.close()
145        self.assertEqual(len(rows), len(values))
146        rows = [row[1] for row in rows]
147        for inval, outval in zip(values, rows):
148            if isinf(inval):
149                self.assert_(isinf(outval))
150                if inval < 0:
151                    self.assert_(outval < 0)
152                else:
153                    self.assert_(outval > 0)
154            elif isnan(inval):
155                self.assert_(isnan(outval))
156            else:
157                self.assertEqual(inval, outval)
158
159    def test_set_decimal_type(self):
160        decimal_type = pgdb.decimal_type()
161        self.assert_(decimal_type is not None and callable(decimal_type))
162        con = self._connect()
163        try:
164            cur = con.cursor()
165            self.assert_(pgdb.decimal_type(int) is int)
166            cur.execute('select 42')
167            value = cur.fetchone()[0]
168            self.assert_(isinstance(value, int))
169            self.assertEqual(value, 42)
170            self.assert_(pgdb.decimal_type(float) is float)
171            cur.execute('select 4.25')
172            value = cur.fetchone()[0]
173            self.assert_(isinstance(value, float))
174            self.assertEqual(value, 4.25)
175        finally:
176            con.close()
177            pgdb.decimal_type(decimal_type)
178        self.assert_(pgdb.decimal_type() is decimal_type)
179
180    def test_nextset(self):
181        con = self._connect()
182        cur = con.cursor()
183        self.assertRaises(con.NotSupportedError, cur.nextset)
184
185    def test_setoutputsize(self):
186        pass  # not supported
187
188    def test_connection_errors(self):
189        con = self._connect()
190        self.assertEqual(con.Error, pgdb.Error)
191        self.assertEqual(con.Warning, pgdb.Warning)
192        self.assertEqual(con.InterfaceError, pgdb.InterfaceError)
193        self.assertEqual(con.DatabaseError, pgdb.DatabaseError)
194        self.assertEqual(con.InternalError, pgdb.InternalError)
195        self.assertEqual(con.OperationalError, pgdb.OperationalError)
196        self.assertEqual(con.ProgrammingError, pgdb.ProgrammingError)
197        self.assertEqual(con.IntegrityError, pgdb.IntegrityError)
198        self.assertEqual(con.DataError, pgdb.DataError)
199        self.assertEqual(con.NotSupportedError, pgdb.NotSupportedError)
200
201    def test_connection_as_contextmanager(self):
202        table = self.table_prefix + 'booze'
203        con = self._connect()
204        try:
205            cur = con.cursor()
206            cur.execute("create table %s (n smallint check(n!=4))" % table)
207            with con:
208                cur.execute("insert into %s values (1)" % table)
209                cur.execute("insert into %s values (2)" % table)
210            try:
211                with con:
212                    cur.execute("insert into %s values (3)" % table)
213                    cur.execute("insert into %s values (4)" % table)
214            except con.ProgrammingError as error:
215                self.assertTrue('check' in str(error).lower())
216            with con:
217                cur.execute("insert into %s values (5)" % table)
218                cur.execute("insert into %s values (6)" % table)
219            try:
220                with con:
221                    cur.execute("insert into %s values (7)" % table)
222                    cur.execute("insert into %s values (8)" % table)
223                    raise ValueError('transaction should rollback')
224            except ValueError as error:
225                self.assertEqual(str(error), 'transaction should rollback')
226            with con:
227                cur.execute("insert into %s values (9)" % table)
228            cur.execute("select * from %s order by 1" % table)
229            rows = cur.fetchall()
230            rows = [row[0] for row in rows]
231        finally:
232            con.close()
233        self.assertEqual(rows, [1, 2, 5, 6, 9])
234
235    def test_cursor_connection(self):
236        con = self._connect()
237        cur = con.cursor()
238        self.assertEqual(cur.connection, con)
239        cur.close()
240
241    def test_cursor_as_contextmanager(self):
242        con = self._connect()
243        with con.cursor() as cur:
244            self.assertEqual(cur.connection, con)
245
246    def test_pgdb_type(self):
247        self.assertEqual(pgdb.STRING, pgdb.STRING)
248        self.assertNotEqual(pgdb.STRING, pgdb.INTEGER)
249        self.assertNotEqual(pgdb.STRING, pgdb.BOOL)
250        self.assertNotEqual(pgdb.BOOL, pgdb.INTEGER)
251        self.assertEqual(pgdb.INTEGER, pgdb.INTEGER)
252        self.assertNotEqual(pgdb.INTEGER, pgdb.NUMBER)
253        self.assertEqual('char', pgdb.STRING)
254        self.assertEqual('varchar', pgdb.STRING)
255        self.assertEqual('text', pgdb.STRING)
256        self.assertNotEqual('numeric', pgdb.STRING)
257        self.assertEqual('numeric', pgdb.NUMERIC)
258        self.assertEqual('numeric', pgdb.NUMBER)
259        self.assertEqual('int4', pgdb.NUMBER)
260        self.assertNotEqual('int4', pgdb.NUMERIC)
261        self.assertEqual('int2', pgdb.SMALLINT)
262        self.assertNotEqual('int4', pgdb.SMALLINT)
263        self.assertEqual('int2', pgdb.INTEGER)
264        self.assertEqual('int4', pgdb.INTEGER)
265        self.assertEqual('int8', pgdb.INTEGER)
266        self.assertNotEqual('int4', pgdb.LONG)
267        self.assertEqual('int8', pgdb.LONG)
268        self.assert_('char' in pgdb.STRING)
269        self.assert_(pgdb.NUMERIC <= pgdb.NUMBER)
270        self.assert_(pgdb.NUMBER >= pgdb.INTEGER)
271        self.assert_(pgdb.TIME <= pgdb.DATETIME)
272        self.assert_(pgdb.DATETIME >= pgdb.DATE)
273
274
275if __name__ == '__main__':
276    unittest.main()
Note: See TracBrowser for help on using the repository browser.