source: trunk/module/TEST_PyGreSQL_classic.py @ 553

Last change on this file since 553 was 553, checked in by cito, 4 years ago

Require at least Python 2.6 for the trunk (5.x)

Support for even older Python versions is maintained in the 4.x branch.
The goal for 5.x is to be a single-source code for both Python 2 and 3,
and this is only possible by dropping support for Python 2.5 and older.
For instance, the new except .. as syntax works only since Python 2.6.
Otherwise we would need to use 2to3 and things would be very ugly.
Note that Python 2.6 is now 7 years old. We may want to drop Python 2.6
as well at some point if it turns out to be a burden.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 12.6 KB
Line 
1#! /usr/bin/python
2
3from __future__ import print_function
4
5import sys
6from functools import partial
7from time import sleep
8from threading import Thread
9import unittest
10from pg import *
11
12# We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
13# get our information from that.  Otherwise we use the defaults.
14dbname = 'unittest'
15dbhost = None
16dbport = 5432
17
18try:
19    from LOCAL_PyGreSQL import *
20except ImportError:
21    pass
22
23def opendb():
24    db = DB(dbname, dbhost, dbport)
25    db.query("SET DATESTYLE TO 'ISO'")
26    db.query("SET TIME ZONE 'EST5EDT'")
27    db.query("SET DEFAULT_WITH_OIDS=FALSE")
28    db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
29    return db
30
31db = opendb()
32for q in (
33    "DROP TABLE _test1._test_schema",
34    "DROP TABLE _test2._test_schema",
35    "DROP SCHEMA _test1",
36    "DROP SCHEMA _test2",
37):
38    try: db.query(q)
39    except: pass
40db.close()
41
42class UtilityTest(unittest.TestCase):
43
44    def setUp(self):
45        """Setup test tables or empty them if they already exist."""
46        db = opendb()
47
48        for t in ('_test1', '_test2'):
49            try:
50                db.query("CREATE SCHEMA " + t)
51            except Error:
52                pass
53            try:
54                db.query("CREATE TABLE %s._test_schema "
55                    "(%s int PRIMARY KEY)" % (t, t))
56            except Error:
57                db.query("DELETE FROM %s._test_schema" % t)
58        try:
59            db.query("CREATE TABLE _test_schema "
60                "(_test int PRIMARY KEY, _i interval, dvar int DEFAULT 999)")
61        except Error:
62            db.query("DELETE FROM _test_schema")
63        try:
64            db.query("CREATE VIEW _test_vschema AS "
65                "SELECT _test, 'abc'::text AS _test2  FROM _test_schema")
66        except Error:
67            pass
68
69    def test_invalidname(self):
70        """Make sure that invalid table names are caught"""
71        db = opendb()
72        self.failUnlessRaises(ProgrammingError, db.get_attnames, 'x.y.z')
73
74    def test_schema(self):
75        """Does it differentiate the same table name in different schemas"""
76        db = opendb()
77        # see if they differentiate the table names properly
78        self.assertEqual(
79            db.get_attnames('_test_schema'),
80            {'_test': 'int', '_i': 'date', 'dvar': 'int'}
81        )
82        self.assertEqual(
83            db.get_attnames('public._test_schema'),
84            {'_test': 'int', '_i': 'date', 'dvar': 'int'}
85        )
86        self.assertEqual(
87            db.get_attnames('_test1._test_schema'),
88            {'_test1': 'int'}
89        )
90        self.assertEqual(
91            db.get_attnames('_test2._test_schema'),
92            {'_test2': 'int'}
93        )
94
95    def test_pkey(self):
96        db = opendb()
97        self.assertEqual(db.pkey('_test_schema'), '_test')
98        self.assertEqual(db.pkey('public._test_schema'), '_test')
99        self.assertEqual(db.pkey('_test1._test_schema'), '_test1')
100
101        self.assertEqual(db.pkey('_test_schema',
102                {'test1': 'a', 'test2.test3': 'b'}),
103                {'public.test1': 'a', 'test2.test3': 'b'})
104        self.assertEqual(db.pkey('test1'), 'a')
105        self.assertEqual(db.pkey('public.test1'), 'a')
106
107    def test_get(self):
108        db = opendb()
109        db.query("INSERT INTO _test_schema VALUES (1234)")
110        db.get('_test_schema', 1234)
111        db.get('_test_schema', 1234, keyname='_test')
112        self.failUnlessRaises(ProgrammingError, db.get, '_test_vschema', 1234)
113        db.get('_test_vschema', 1234, keyname='_test')
114
115    def test_params(self):
116        db = opendb()
117        db.query("INSERT INTO _test_schema VALUES ($1, $2, $3)", 12, None, 34)
118        d = db.get('_test_schema', 12)
119        self.assertEqual(d['dvar'], 34)
120
121    def test_insert(self):
122        db = opendb()
123        d = dict(_test=1234)
124        db.insert('_test_schema', d)
125        self.assertEqual(d['dvar'], 999)
126        db.insert('_test_schema', _test=1235)
127        self.assertEqual(d['dvar'], 999)
128
129    def test_context_manager(self):
130        db = opendb()
131        t = '_test_schema'
132        d = dict(_test=1235)
133        with db:
134            db.insert(t, d)
135            d['_test'] += 1
136            db.insert(t, d)
137        try:
138            with db:
139                d['_test'] += 1
140                db.insert(t, d)
141                db.insert(t, d)
142        except ProgrammingError:
143            pass
144        with db:
145            d['_test'] += 1
146            db.insert(t, d)
147            d['_test'] += 1
148            db.insert(t, d)
149        self.assertTrue(db.get(t, 1235))
150        self.assertTrue(db.get(t, 1236))
151        self.assertRaises(DatabaseError, db.get, t, 1237)
152        self.assertTrue(db.get(t, 1238))
153        self.assertTrue(db.get(t, 1239))
154
155    def test_sqlstate(self):
156        db = opendb()
157        db.query("INSERT INTO _test_schema VALUES (1234)")
158        try:
159            db.query("INSERT INTO _test_schema VALUES (1234)")
160        except DatabaseError as error:
161            # currently PyGreSQL does not support IntegrityError
162            self.assert_(isinstance(error, ProgrammingError))
163            # the SQLSTATE error code for unique violation is 23505
164            self.assertEqual(error.sqlstate, '23505')
165
166    def test_mixed_case(self):
167        db = opendb()
168        try:
169            db.query('CREATE TABLE _test_mc ("_Test" int PRIMARY KEY)')
170        except Error:
171            db.query("DELETE FROM _test_mc")
172        d = dict(_Test=1234)
173        db.insert('_test_mc', d)
174
175    def test_update(self):
176        db = opendb()
177        db.query("INSERT INTO _test_schema VALUES (1234)")
178
179        r = db.get('_test_schema', 1234)
180        r['dvar'] = 123
181        db.update('_test_schema', r)
182        r = db.get('_test_schema', 1234)
183        self.assertEqual(r['dvar'], 123)
184
185        r = db.get('_test_schema', 1234)
186        db.update('_test_schema', _test=1234, dvar=456)
187        r = db.get('_test_schema', 1234)
188        self.assertEqual(r['dvar'], 456)
189
190        r = db.get('_test_schema', 1234)
191        db.update('_test_schema', r, dvar=456)
192        r = db.get('_test_schema', 1234)
193        self.assertEqual(r['dvar'], 456)
194
195    def test_quote(self):
196        db = opendb()
197        q = db._quote
198        self.assertEqual(q(0, 'int'), "0")
199        self.assertEqual(q(0, 'num'), "0")
200        self.assertEqual(q('0', 'int'), "0")
201        self.assertEqual(q('0', 'num'), "0")
202        self.assertEqual(q(1, 'int'), "1")
203        self.assertEqual(q(1, 'text'), "'1'")
204        self.assertEqual(q(1, 'num'), "1")
205        self.assertEqual(q('1', 'int'), "1")
206        self.assertEqual(q('1', 'text'), "'1'")
207        self.assertEqual(q('1', 'num'), "1")
208        self.assertEqual(q(None, 'int'), "NULL")
209        self.assertEqual(q(1, 'money'), "1")
210        self.assertEqual(q('1', 'money'), "1")
211        self.assertEqual(q(1.234, 'money'), "1.234")
212        self.assertEqual(q('1.234', 'money'), "1.234")
213        self.assertEqual(q(0, 'money'), "0")
214        self.assertEqual(q(0.00, 'money'), "0.0")
215        self.assertEqual(q(Decimal('0.00'), 'money'), "0.00")
216        self.assertEqual(q(None, 'money'), "NULL")
217        self.assertEqual(q('', 'money'), "NULL")
218        self.assertEqual(q(0, 'bool'), "'f'")
219        self.assertEqual(q('', 'bool'), "NULL")
220        self.assertEqual(q('f', 'bool'), "'f'")
221        self.assertEqual(q('off', 'bool'), "'f'")
222        self.assertEqual(q('no', 'bool'), "'f'")
223        self.assertEqual(q(1, 'bool'), "'t'")
224        self.assertEqual(q(9999, 'bool'), "'t'")
225        self.assertEqual(q(-9999, 'bool'), "'t'")
226        self.assertEqual(q('1', 'bool'), "'t'")
227        self.assertEqual(q('t', 'bool'), "'t'")
228        self.assertEqual(q('on', 'bool'), "'t'")
229        self.assertEqual(q('yes', 'bool'), "'t'")
230        self.assertEqual(q('true', 'bool'), "'t'")
231        self.assertEqual(q('y', 'bool'), "'t'")
232        self.assertEqual(q('', 'date'), "NULL")
233        self.assertEqual(q(False, 'date'), "NULL")
234        self.assertEqual(q(0, 'date'), "NULL")
235        self.assertEqual(q('some_date', 'date'), "'some_date'")
236        self.assertEqual(q('current_timestamp', 'date'), "current_timestamp")
237        self.assertEqual(q('', 'text'), "''")
238        self.assertEqual(q("'", 'text'), "''''")
239        self.assertEqual(q("\\", 'text'), "'\\\\'")
240
241    def notify_callback(self, arg_dict):
242        if arg_dict:
243            arg_dict['called'] = True
244        else:
245            self.notify_timeout = True
246
247    def test_notify(self):
248        for run_as_method in False, True:
249            for call_notify in False, True:
250                db = opendb()
251                # Get function under test, can be standalone or DB method.
252                fut = db.notification_handler if run_as_method else partial(
253                    NotificationHandler, db)
254                arg_dict = dict(event=None, called=False)
255                self.notify_timeout = False
256                # Listen for 'event_1'.
257                target = fut('event_1', self.notify_callback, arg_dict)
258                thread = Thread(None, target)
259                thread.start()
260                # Wait until the thread has started.
261                for n in xrange(500):
262                    if target.listening:
263                        break
264                    sleep(0.01)
265                self.assertTrue(target.listening)
266                self.assertTrue(thread.isAlive())
267                # Open another connection for sending notifications.
268                db2 = opendb()
269                # Generate notification from the other connection.
270                if call_notify:
271                    target.notify(db2, payload='payload 1')
272                else:
273                    db2.query("notify event_1, 'payload 1'")
274                # Wait until the notification has been caught.
275                for n in xrange(500):
276                    if arg_dict['called'] or self.notify_timeout:
277                        break
278                    sleep(0.01)
279                # Check that callback has been invoked.
280                self.assertTrue(arg_dict['called'])
281                self.assertEqual(arg_dict['event'], 'event_1')
282                self.assertEqual(arg_dict['extra'], 'payload 1')
283                self.assertTrue(isinstance(arg_dict['pid'], int))
284                self.assertFalse(self.notify_timeout)
285                arg_dict['called'] = False
286                self.assertTrue(thread.isAlive())
287                # Generate stop notification.
288                if call_notify:
289                    target.notify(db2, stop=True, payload='payload 2')
290                else:
291                    db2.query("notify stop_event_1, 'payload 2'")
292                db2.close()
293                # Wait until the notification has been caught.
294                for n in xrange(500):
295                    if arg_dict['called'] or self.notify_timeout:
296                        break
297                    sleep(0.01)
298                # Check that callback has been invoked.
299                self.assertTrue(arg_dict['called'])
300                self.assertEqual(arg_dict['event'], 'stop_event_1')
301                self.assertEqual(arg_dict['extra'], 'payload 2')
302                self.assertTrue(isinstance(arg_dict['pid'], int))
303                self.assertFalse(self.notify_timeout)
304                thread.join(5)
305                self.assertFalse(thread.isAlive())
306                self.assertFalse(target.listening)
307                target.close()
308
309    def test_notify_timeout(self):
310        for run_as_method in False, True:
311            db = opendb()
312            # Get function under test, can be standalone or DB method.
313            fut = db.notification_handler if run_as_method else partial(
314                NotificationHandler, db)
315            arg_dict = dict(event=None, called=False)
316            self.notify_timeout = False
317            # Listen for 'event_1' with timeout of 10ms.
318            target = fut('event_1', self.notify_callback, arg_dict, 0.01)
319            thread = Thread(None, target)
320            thread.start()
321            # Sleep 20ms, long enough to time out.
322            sleep(0.02)
323            # Verify that we've indeed timed out.
324            self.assertFalse(arg_dict.get('called'))
325            self.assertTrue(self.notify_timeout)
326            self.assertFalse(thread.isAlive())
327            self.assertFalse(target.listening)
328            target.close()
329
330
331if __name__ == '__main__':
332    suite = unittest.TestSuite()
333
334    if len(sys.argv) > 1: test_list = sys.argv[1:]
335    else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
336
337    if len(sys.argv) == 2 and sys.argv[1] == '-l':
338        print('\n'.join(unittest.getTestCaseNames(UtilityTest, 'test_')))
339        sys.exit(1)
340
341    for test_name in test_list:
342        try:
343            suite.addTest(UtilityTest(test_name))
344        except:
345            print("\n ERROR: %s.\n" % sys.exc_value)
346            sys.exit(1)
347
348    rc = unittest.TextTestRunner(verbosity=1).run(suite)
349    sys.exit(len(rc.errors+rc.failures) != 0)
350
Note: See TracBrowser for help on using the repository browser.