source: trunk/tests/test_classic_notification.py

Last change on this file was 969, checked in by cito, 6 months ago

Shebang should not be followed by a blank

It's a myth that it is needed because some old versions of Unix expect it.
However, some editors do not like a blank here.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 14.5 KB
Line 
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3
4"""Test the classic PyGreSQL interface.
5
6Sub-tests for the notification handler object.
7
8Contributed by Christoph Zwerschke.
9
10These tests need a database to test against.
11"""
12
13try:
14    import unittest2 as unittest  # for Python < 2.7
15except ImportError:
16    import unittest
17
18import warnings
19from time import sleep
20from threading import Thread
21
22import pg  # the module under test
23
24# We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
25# get our information from that.  Otherwise we use the defaults.
26# The current user must have create schema privilege on the database.
27dbname = 'unittest'
28dbhost = None
29dbport = 5432
30
31debug = False  # let DB wrapper print debugging output
32
33try:
34    from .LOCAL_PyGreSQL import *
35except (ImportError, ValueError):
36    try:
37        from LOCAL_PyGreSQL import *
38    except ImportError:
39        pass
40
41
42def DB():
43    """Create a DB wrapper object connecting to the test database."""
44    db = pg.DB(dbname, dbhost, dbport)
45    if debug:
46        db.debug = debug
47    return db
48
49
50class TestPyNotifyAlias(unittest.TestCase):
51    """Test alternative ways of creating a NotificationHandler."""
52
53    def callback(self):
54        self.fail('Callback should not be called in this test')
55
56    def testPgNotify(self):
57        db = DB()
58        arg_dict = {}
59        args = ('test_event', self.callback, arg_dict)
60        kwargs = dict(timeout=2, stop_event='test_stop')
61        with warnings.catch_warnings(record=True) as warn_msgs:
62            warnings.simplefilter("always")
63            handler1 = pg.pgnotify(db, *args, **kwargs)
64            assert len(warn_msgs) == 1
65            warn_msg = warn_msgs[0]
66            assert issubclass(warn_msg.category, DeprecationWarning)
67            assert 'deprecated' in str(warn_msg.message)
68        self.assertIsInstance(handler1, pg.NotificationHandler)
69        handler2 = db.notification_handler(*args, **kwargs)
70        self.assertIsInstance(handler2, pg.NotificationHandler)
71        self.assertIs(handler1.db, handler2.db)
72        self.assertEqual(handler1.event, handler2.event)
73        self.assertIs(handler1.callback, handler2.callback)
74        self.assertIs(handler1.arg_dict, handler2.arg_dict)
75        self.assertEqual(handler1.timeout, handler2.timeout)
76        self.assertEqual(handler1.stop_event, handler2.stop_event)
77
78
79class TestSyncNotification(unittest.TestCase):
80    """Test notification handler running in the same thread."""
81
82    def setUp(self):
83        self.db = DB()
84        self.timeout = None
85        self.called = True
86        self.payloads = []
87
88    def tearDown(self):
89        if self.db:
90            self.db.close()
91
92    def callback(self, arg_dict):
93        if arg_dict is None:
94            self.timeout = True
95        else:
96            self.timeout = False
97            self.payloads.append(arg_dict.get('extra'))
98
99    def get_handler(self, event=None, arg_dict=None, stop_event=None):
100        if not event:
101            event = 'test_async_notification'
102            if not stop_event:
103                stop_event = 'stop_async_notification'
104        callback = self.callback
105        handler = self.db.notification_handler(
106            event, callback, arg_dict, 0, stop_event)
107        self.assertEqual(handler.event, event)
108        self.assertEqual(handler.stop_event, stop_event or 'stop_%s' % event)
109        self.assertIs(handler.callback, callback)
110        if arg_dict is None:
111            self.assertEqual(handler.arg_dict, {})
112        else:
113            self.assertIs(handler.arg_dict, arg_dict)
114        self.assertEqual(handler.timeout, 0)
115        self.assertFalse(handler.listening)
116        return handler
117
118    def testCloseHandler(self):
119        handler = self.get_handler()
120        self.assertIs(handler.db, self.db)
121        handler.close()
122        self.assertRaises(pg.InternalError, self.db.close)
123        self.db = None
124        self.assertIs(handler.db, None)
125
126    def testDeleteHandler(self):
127        handler = self.get_handler('test_del')
128        self.assertIs(handler.db, self.db)
129        handler.listen()
130        self.db.query('notify test_del')
131        self.db.query('notify test_del')
132        del handler
133        self.db.query('notify test_del')
134        n = 0
135        while self.db.getnotify() and n < 4:
136            n += 1
137        self.assertEqual(n, 2)
138
139    def testNotify(self):
140        handler = self.get_handler()
141        handler.listen()
142        self.assertRaises(TypeError, handler.notify, invalid=True)
143        handler.notify(payload='baz')
144        handler.notify(stop=True, payload='buz')
145        handler.unlisten()
146        self.db.close()
147        self.db = None
148
149    def testNotifyWithArgsAndPayload(self):
150        arg_dict = {'foo': 'bar'}
151        handler = self.get_handler(arg_dict=arg_dict)
152        self.assertEqual(handler.timeout, 0)
153        handler.listen()
154        handler.notify(payload='baz')
155        handler.notify(payload='biz')
156        handler()
157        self.assertIsNotNone(self.timeout)
158        self.assertFalse(self.timeout)
159        self.assertEqual(self.payloads, ['baz', 'biz'])
160        self.assertEqual(arg_dict['foo'], 'bar')
161        self.assertEqual(arg_dict['event'], handler.event)
162        self.assertIsInstance(arg_dict['pid'], int)
163        self.assertEqual(arg_dict['extra'], 'biz')
164        self.assertTrue(handler.listening)
165        del self.payloads[:]
166        handler.notify(stop=True, payload='buz')
167        handler()
168        self.assertIsNotNone(self.timeout)
169        self.assertFalse(self.timeout)
170        self.assertEqual(self.payloads, ['buz'])
171        self.assertEqual(arg_dict['foo'], 'bar')
172        self.assertEqual(arg_dict['event'], handler.stop_event)
173        self.assertIsInstance(arg_dict['pid'], int)
174        self.assertEqual(arg_dict['extra'], 'buz')
175        self.assertFalse(handler.listening)
176        handler.unlisten()
177
178    def testNotifyWrongEvent(self):
179        handler = self.get_handler('good_event')
180        self.assertEqual(handler.timeout, 0)
181        handler.listen()
182        handler.notify(payload="note 1")
183        self.db.query("notify bad_event, 'note 2'")
184        handler.notify(payload="note 3")
185        handler()
186        self.assertIsNotNone(self.timeout)
187        self.assertFalse(self.timeout)
188        self.assertEqual(self.payloads, ['note 1', 'note 3'])
189        self.assertTrue(handler.listening)
190        del self.payloads[:]
191        self.db.query('listen bad_event')
192        handler.notify(payload="note 4")
193        self.db.query("notify bad_event, 'note 5'")
194        handler.notify(payload="note 6")
195        try:
196            handler()
197        except pg.DatabaseError as error:
198            self.assertEqual(str(error),
199                'Listening for "good_event" and "stop_good_event",'
200                ' but notified of "bad_event"')
201        self.assertIsNotNone(self.timeout)
202        self.assertFalse(self.timeout)
203        self.assertEqual(self.payloads, ['note 4'])
204        self.assertFalse(handler.listening)
205
206
207class TestAsyncNotification(unittest.TestCase):
208    """Test notification handler running in a separate thread."""
209
210    def setUp(self):
211        self.db = DB()
212
213    def tearDown(self):
214        self.doCleanups()
215        if self.db:
216            self.db.close()
217
218    def callback(self, arg_dict):
219        if arg_dict is None:
220            self.timeout = True
221        elif arg_dict is self.arg_dict:
222            arg_dict = arg_dict.copy()
223            pid = arg_dict.get('pid')
224            if isinstance(pid, int):
225                arg_dict['pid'] = 1
226            self.received.append(arg_dict)
227        else:
228            self.received.append(dict(error=arg_dict))
229
230    def start_handler(self,
231            event=None, arg_dict=None, timeout=5, stop_event=None):
232        db = DB()
233        if not event:
234            event = 'test_async_notification'
235            if not stop_event:
236                stop_event = 'stop_async_notification'
237        callback = self.callback
238        handler = db.notification_handler(
239            event, callback, arg_dict, timeout, stop_event)
240        self.handler = handler
241        self.assertIsInstance(handler, pg.NotificationHandler)
242        self.assertEqual(handler.event, event)
243        self.assertEqual(handler.stop_event, stop_event or 'stop_%s' % event)
244        self.event = handler.event
245        self.assertIs(handler.callback, callback)
246        if arg_dict is None:
247            self.assertEqual(handler.arg_dict, {})
248        else:
249            self.assertIsInstance(handler.arg_dict, dict)
250        self.arg_dict = handler.arg_dict
251        self.assertEqual(handler.timeout, timeout)
252        self.assertFalse(handler.listening)
253        thread = Thread(target=handler, name='test_notification_thread')
254        self.thread = thread
255        thread.start()
256        self.stopped = timeout == 0
257        self.addCleanup(self.stop_handler)
258        for n in range(500):
259            if handler.listening:
260                break
261            sleep(0.01)
262        self.assertTrue(handler.listening)
263        if not self.stopped:
264            self.assertTrue(thread.is_alive())
265        self.timeout = False
266        self.received = []
267        self.sent = []
268
269    def stop_handler(self):
270        handler = self.handler
271        thread = self.thread
272        if not self.stopped and self.handler.listening:
273            self.notify_handler(stop=True)
274        handler.close()
275        self.db = None
276        if thread.is_alive():
277            thread.join(5)
278        self.assertFalse(handler.listening)
279        self.assertFalse(thread.is_alive())
280
281    def notify_handler(self, stop=False, payload=None):
282        event = self.event
283        if stop:
284            event = self.handler.stop_event
285            self.stopped = True
286        arg_dict = self.arg_dict.copy()
287        arg_dict.update(event=event, pid=1, extra=payload or '')
288        self.handler.notify(db=self.db, stop=stop, payload=payload)
289        self.sent.append(arg_dict)
290
291    def notify_query(self, stop=False, payload=None):
292        event = self.event
293        if stop:
294            event = self.handler.stop_event
295            self.stopped = True
296        q = 'notify "%s"' % event
297        if payload:
298            q += ", '%s'" % payload
299        arg_dict = self.arg_dict.copy()
300        arg_dict.update(event=event, pid=1, extra=payload or '')
301        self.db.query(q)
302        self.sent.append(arg_dict)
303
304    def wait(self):
305        for n in range(500):
306            if self.timeout:
307                return False
308            if len(self.received) >= len(self.sent):
309                return True
310            sleep(0.01)
311
312    def receive(self, stop=False):
313        if not self.sent:
314            stop = True
315        if stop:
316            self.notify_handler(stop=True, payload='stop')
317        self.assertTrue(self.wait())
318        self.assertFalse(self.timeout)
319        self.assertEqual(self.received, self.sent)
320        self.received = []
321        self.sent = []
322        self.assertEqual(self.handler.listening, not self.stopped)
323
324    def testNotifyHandlerEmpty(self):
325        self.start_handler()
326        self.notify_handler(stop=True)
327        self.assertEqual(len(self.sent), 1)
328        self.receive()
329
330    def testNotifyQueryEmpty(self):
331        self.start_handler()
332        self.notify_query(stop=True)
333        self.assertEqual(len(self.sent), 1)
334        self.receive()
335
336    def testNotifyHandlerOnce(self):
337        self.start_handler()
338        self.notify_handler()
339        self.assertEqual(len(self.sent), 1)
340        self.receive()
341        self.receive(stop=True)
342
343    def testNotifyQueryOnce(self):
344        self.start_handler()
345        self.notify_query()
346        self.receive()
347        self.notify_query(stop=True)
348        self.receive()
349
350    def testNotifyWithArgs(self):
351        arg_dict = {'test': 42, 'more': 43, 'less': 41}
352        self.start_handler('test_args', arg_dict)
353        self.notify_query()
354        self.receive(stop=True)
355
356    def testNotifySeveralTimes(self):
357        arg_dict = {'test': 1}
358        self.start_handler(arg_dict=arg_dict)
359        for count in range(3):
360            self.notify_query()
361        self.receive()
362        arg_dict['test'] += 1
363        for count in range(2):
364            self.notify_handler()
365        self.receive()
366        arg_dict['test'] += 1
367        for count in range(3):
368            self.notify_query()
369        self.receive(stop=True)
370
371    def testNotifyOnceWithPayload(self):
372        self.start_handler()
373        self.notify_query(payload='test_payload')
374        self.receive(stop=True)
375
376    def testNotifyWithArgsAndPayload(self):
377        self.start_handler(arg_dict={'foo': 'bar'})
378        self.notify_query(payload='baz')
379        self.receive(stop=True)
380
381    def testNotifyQuotedNames(self):
382        self.start_handler('Hello, World!')
383        self.notify_query(payload='How do you do?')
384        self.receive(stop=True)
385
386    def testNotifyWithFivePayloads(self):
387        self.start_handler('gimme_5', {'test': 'Gimme 5'})
388        for count in range(5):
389            self.notify_query(payload="Round %d" % count)
390        self.assertEqual(len(self.sent), 5)
391        self.receive(stop=True)
392
393    def testReceiveImmediately(self):
394        self.start_handler('immediate', {'test': 'immediate'})
395        for count in range(3):
396            self.notify_query(payload="Round %d" % count)
397            self.receive()
398        self.receive(stop=True)
399
400    def testNotifyDistinctInTransaction(self):
401        self.start_handler('test_transaction', {'transaction': True})
402        self.db.begin()
403        for count in range(3):
404            self.notify_query(payload='Round %d' % count)
405        self.db.commit()
406        self.receive(stop=True)
407
408    def testNotifySameInTransaction(self):
409        self.start_handler('test_transaction', {'transaction': True})
410        self.db.begin()
411        for count in range(3):
412            self.notify_query()
413        self.db.commit()
414        # these same notifications may be delivered as one,
415        # so we must not wait for all three to appear
416        self.sent = self.sent[:1]
417        self.receive(stop=True)
418
419    def testNotifyNoTimeout(self):
420        self.start_handler(timeout=None)
421        self.assertIsNone(self.handler.timeout)
422        self.assertTrue(self.handler.listening)
423        sleep(0.02)
424        self.assertFalse(self.timeout)
425        self.receive(stop=True)
426
427    def testNotifyZeroTimeout(self):
428        self.start_handler(timeout=0)
429        self.assertEqual(self.handler.timeout, 0)
430        self.assertTrue(self.handler.listening)
431        self.assertFalse(self.timeout)
432
433    def testNotifyWithoutTimeout(self):
434        self.start_handler(timeout=1)
435        self.assertEqual(self.handler.timeout, 1)
436        sleep(0.02)
437        self.assertFalse(self.timeout)
438        self.receive(stop=True)
439
440    def testNotifyWithTimeout(self):
441        self.start_handler(timeout=0.01)
442        sleep(0.02)
443        self.assertTrue(self.timeout)
444
445
446if __name__ == '__main__':
447    unittest.main()
Note: See TracBrowser for help on using the repository browser.