Changeset 621


Ignore:
Timestamp:
Nov 25, 2015, 7:09:06 AM (4 years ago)
Author:
cito
Message:

Notification handler dropped concurrent messages

Add fix and test contributed by Patrick TJ McPhee?
as suggested on the mailing list.

Location:
trunk/module
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/module/TEST_PyGreSQL_classic.py

    r602 r621  
    249249            self.notify_timeout = True
    250250
    251     def test_notify(self):
     251    def test_notify(self, options=None):
     252        if not options:
     253            options = {}
     254        run_as_method = options.get('run_as_method')
     255        call_notify = options.get('call_notify')
     256        two_payloads = options.get('two_payloads')
     257        db = opendb()
     258        # Get function under test, can be standalone or DB method.
     259        fut = db.notification_handler if run_as_method else partial(
     260            NotificationHandler, db)
     261        arg_dict = dict(event=None, called=False)
     262        self.notify_timeout = False
     263        # Listen for 'event_1'.
     264        target = fut('event_1', self.notify_callback, arg_dict)
     265        thread = Thread(None, target)
     266        thread.start()
     267        # Wait until the thread has started.
     268        for n in range(500):
     269            if target.listening:
     270                break
     271            sleep(0.01)
     272        self.assertTrue(target.listening)
     273        self.assertTrue(thread.isAlive())
     274        # Open another connection for sending notifications.
     275        db2 = opendb()
     276        # Generate notification from the other connection.
     277        if two_payloads:
     278            db2.begin()
     279        if call_notify:
     280            if two_payloads:
     281                target.notify(db2, payload='payload 0')
     282            target.notify(db2, payload='payload 1')
     283        else:
     284            if two_payloads:
     285                db2.query("notify event_1, 'payload 0'")
     286            db2.query("notify event_1, 'payload 1'")
     287        if two_payloads:
     288            db2.commit()
     289        # Wait until the notification has been caught.
     290        for n in range(500):
     291            if arg_dict['called'] or self.notify_timeout:
     292                break
     293            sleep(0.01)
     294        # Check that callback has been invoked.
     295        self.assertTrue(arg_dict['called'])
     296        self.assertEqual(arg_dict['event'], 'event_1')
     297        self.assertEqual(arg_dict['extra'], 'payload 1')
     298        self.assertTrue(isinstance(arg_dict['pid'], int))
     299        self.assertFalse(self.notify_timeout)
     300        arg_dict['called'] = False
     301        self.assertTrue(thread.isAlive())
     302        # Generate stop notification.
     303        if call_notify:
     304            target.notify(db2, stop=True, payload='payload 2')
     305        else:
     306            db2.query("notify stop_event_1, 'payload 2'")
     307        db2.close()
     308        # Wait until the notification has been caught.
     309        for n in range(500):
     310            if arg_dict['called'] or self.notify_timeout:
     311                break
     312            sleep(0.01)
     313        # Check that callback has been invoked.
     314        self.assertTrue(arg_dict['called'])
     315        self.assertEqual(arg_dict['event'], 'stop_event_1')
     316        self.assertEqual(arg_dict['extra'], 'payload 2')
     317        self.assertTrue(isinstance(arg_dict['pid'], int))
     318        self.assertFalse(self.notify_timeout)
     319        thread.join(5)
     320        self.assertFalse(thread.isAlive())
     321        self.assertFalse(target.listening)
     322        target.close()
     323
     324    def test_notify_other_options(self):
    252325        for run_as_method in False, True:
    253326            for call_notify in False, True:
    254                 db = opendb()
    255                 # Get function under test, can be standalone or DB method.
    256                 fut = db.notification_handler if run_as_method else partial(
    257                     NotificationHandler, db)
    258                 arg_dict = dict(event=None, called=False)
    259                 self.notify_timeout = False
    260                 # Listen for 'event_1'.
    261                 target = fut('event_1', self.notify_callback, arg_dict)
    262                 thread = Thread(None, target)
    263                 thread.start()
    264                 # Wait until the thread has started.
    265                 for n in range(500):
    266                     if target.listening:
    267                         break
    268                     sleep(0.01)
    269                 self.assertTrue(target.listening)
    270                 self.assertTrue(thread.isAlive())
    271                 # Open another connection for sending notifications.
    272                 db2 = opendb()
    273                 # Generate notification from the other connection.
    274                 if call_notify:
    275                     target.notify(db2, payload='payload 1')
    276                 else:
    277                     db2.query("notify event_1, 'payload 1'")
    278                 # Wait until the notification has been caught.
    279                 for n in range(500):
    280                     if arg_dict['called'] or self.notify_timeout:
    281                         break
    282                     sleep(0.01)
    283                 # Check that callback has been invoked.
    284                 self.assertTrue(arg_dict['called'])
    285                 self.assertEqual(arg_dict['event'], 'event_1')
    286                 self.assertEqual(arg_dict['extra'], 'payload 1')
    287                 self.assertTrue(isinstance(arg_dict['pid'], int))
    288                 self.assertFalse(self.notify_timeout)
    289                 arg_dict['called'] = False
    290                 self.assertTrue(thread.isAlive())
    291                 # Generate stop notification.
    292                 if call_notify:
    293                     target.notify(db2, stop=True, payload='payload 2')
    294                 else:
    295                     db2.query("notify stop_event_1, 'payload 2'")
    296                 db2.close()
    297                 # Wait until the notification has been caught.
    298                 for n in range(500):
    299                     if arg_dict['called'] or self.notify_timeout:
    300                         break
    301                     sleep(0.01)
    302                 # Check that callback has been invoked.
    303                 self.assertTrue(arg_dict['called'])
    304                 self.assertEqual(arg_dict['event'], 'stop_event_1')
    305                 self.assertEqual(arg_dict['extra'], 'payload 2')
    306                 self.assertTrue(isinstance(arg_dict['pid'], int))
    307                 self.assertFalse(self.notify_timeout)
    308                 thread.join(5)
    309                 self.assertFalse(thread.isAlive())
    310                 self.assertFalse(target.listening)
    311                 target.close()
     327                for two_payloads in False, True:
     328                    options = dict(
     329                        run_as_method=run_as_method,
     330                        call_notify=call_notify,
     331                        two_payloads=two_payloads)
     332                    if any(options.values()):
     333                        self.test_notify(options)
    312334
    313335    def test_notify_timeout(self):
  • trunk/module/pg.py

    r591 r621  
    221221        _ilist = [self.db.fileno()]
    222222
    223         while True:
     223        while self.listening:
    224224            ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
    225             if ilist == []:  # we timed out
    226                 self.unlisten()
    227                 self.callback(None)
    228                 break
    229             else:
    230                 notice = self.db.getnotify()
    231                 if notice is None:
    232                     continue
    233                 event, pid, extra = notice
    234                 if event in (self.event, self.stop_event):
     225            if ilist:
     226                while self.listening:
     227                    notice = self.db.getnotify()
     228                    if not notice:  # no more messages
     229                        break
     230                    event, pid, extra = notice
     231                    if event not in (self.event, self.stop_event):
     232                        self.unlisten()
     233                        raise _db_error(
     234                            'listening for "%s" and "%s", but notified of "%s"'
     235                            % (self.event, self.stop_event, event))
     236                    if event == self.stop_event:
     237                        self.unlisten()
    235238                    self.arg_dict['pid'] = pid
    236239                    self.arg_dict['event'] = event
    237240                    self.arg_dict['extra'] = extra
    238241                    self.callback(self.arg_dict)
    239                     if event == self.stop_event:
    240                         self.unlisten()
    241                         break
    242                 else:
    243                     self.unlisten()
    244                     raise _db_error(
    245                         'listening for "%s" and "%s", but notified of "%s"'
    246                         % (self.event, self.stop_event, event))
     242            else:   # we timed out
     243                self.unlisten()
     244                self.callback(None)
    247245
    248246
Note: See TracChangeset for help on using the changeset viewer.