Changeset 625


Ignore:
Timestamp:
Nov 25, 2015, 8:40:18 AM (4 years ago)
Author:
cito
Message:

Backport notification handler fix to 4.x

The notificaton handler had been fixed in trunk so that it
will not drop concurrent messages any more.

Location:
branches/4.x/module
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • branches/4.x/module/TEST_PyGreSQL_classic.py

    r521 r625  
    235235            self.notify_timeout = True
    236236
    237     def test_notify(self):
     237    def test_notify(self, options=None):
     238        if not options:
     239            options = {}
     240        run_as_method = options.get('run_as_method')
     241        call_notify = options.get('call_notify')
     242        two_payloads = options.get('two_payloads')
     243        db = opendb()
     244        # Get function under test, can be standalone or DB method.
     245        fut = db.notification_handler if run_as_method else partial(
     246            NotificationHandler, db)
     247        arg_dict = dict(event=None, called=False)
     248        self.notify_timeout = False
     249        # Listen for 'event_1'.
     250        target = fut('event_1', self.notify_callback, arg_dict, 5)
     251        thread = Thread(None, target)
     252        thread.start()
     253        try:
     254            # Wait until the thread has started.
     255            for n in range(500):
     256                if target.listening:
     257                    break
     258                sleep(0.01)
     259            self.assertTrue(target.listening)
     260            self.assertTrue(thread.isAlive())
     261            # Open another connection for sending notifications.
     262            db2 = opendb()
     263            # Generate notification from the other connection.
     264            if two_payloads:
     265                db2.begin()
     266            if call_notify:
     267                if two_payloads:
     268                    target.notify(db2, payload='payload 0')
     269                target.notify(db2, payload='payload 1')
     270            else:
     271                if two_payloads:
     272                    db2.query("notify event_1, 'payload 0'")
     273                db2.query("notify event_1, 'payload 1'")
     274            if two_payloads:
     275                db2.commit()
     276            # Wait until the notification has been caught.
     277            for n in range(500):
     278                if arg_dict['called'] or self.notify_timeout:
     279                    break
     280                sleep(0.01)
     281            # Check that callback has been invoked.
     282            self.assertTrue(arg_dict['called'])
     283            self.assertEqual(arg_dict['event'], 'event_1')
     284            self.assertEqual(arg_dict['extra'], 'payload 1')
     285            self.assertTrue(isinstance(arg_dict['pid'], int))
     286            self.assertFalse(self.notify_timeout)
     287            arg_dict['called'] = False
     288            self.assertTrue(thread.isAlive())
     289            # Generate stop notification.
     290            if call_notify:
     291                target.notify(db2, stop=True, payload='payload 2')
     292            else:
     293                db2.query("notify stop_event_1, 'payload 2'")
     294            db2.close()
     295            # Wait until the notification has been caught.
     296            for n in range(500):
     297                if arg_dict['called'] or self.notify_timeout:
     298                    break
     299                sleep(0.01)
     300            # Check that callback has been invoked.
     301            self.assertTrue(arg_dict['called'])
     302            self.assertEqual(arg_dict['event'], 'stop_event_1')
     303            self.assertEqual(arg_dict['extra'], 'payload 2')
     304            self.assertTrue(isinstance(arg_dict['pid'], int))
     305            self.assertFalse(self.notify_timeout)
     306            thread.join(5)
     307            self.assertFalse(thread.isAlive())
     308            self.assertFalse(target.listening)
     309        finally:
     310            target.close()
     311            if thread.is_alive():
     312                thread.join(5)
     313
     314    def test_notify_other_options(self):
    238315        for run_as_method in False, True:
    239316            for call_notify in False, True:
    240                 db = opendb()
    241                 # Get function under test, can be standalone or DB method.
    242                 fut = db.notification_handler if run_as_method else partial(
    243                     NotificationHandler, db)
    244                 arg_dict = dict(event=None, called=False)
    245                 self.notify_timeout = False
    246                 # Listen for 'event_1'.
    247                 target = fut('event_1', self.notify_callback, arg_dict)
    248                 thread = Thread(None, target)
    249                 thread.start()
    250                 # Wait until the thread has started.
    251                 for n in xrange(500):
    252                     if target.listening:
    253                         break
    254                     sleep(0.01)
    255                 self.assertTrue(target.listening)
    256                 self.assertTrue(thread.isAlive())
    257                 # Open another connection for sending notifications.
    258                 db2 = opendb()
    259                 # Generate notification from the other connection.
    260                 if call_notify:
    261                     target.notify(db2, payload='payload 1')
    262                 else:
    263                     db2.query("notify event_1, 'payload 1'")
    264                 # Wait until the notification has been caught.
    265                 for n in xrange(500):
    266                     if arg_dict['called'] or self.notify_timeout:
    267                         break
    268                     sleep(0.01)
    269                 # Check that callback has been invoked.
    270                 self.assertTrue(arg_dict['called'])
    271                 self.assertEqual(arg_dict['event'], 'event_1')
    272                 self.assertEqual(arg_dict['extra'], 'payload 1')
    273                 self.assertTrue(isinstance(arg_dict['pid'], int))
    274                 self.assertFalse(self.notify_timeout)
    275                 arg_dict['called'] = False
    276                 self.assertTrue(thread.isAlive())
    277                 # Generate stop notification.
    278                 if call_notify:
    279                     target.notify(db2, stop=True, payload='payload 2')
    280                 else:
    281                     db2.query("notify stop_event_1, 'payload 2'")
    282                 db2.close()
    283                 # Wait until the notification has been caught.
    284                 for n in xrange(500):
    285                     if arg_dict['called'] or self.notify_timeout:
    286                         break
    287                     sleep(0.01)
    288                 # Check that callback has been invoked.
    289                 self.assertTrue(arg_dict['called'])
    290                 self.assertEqual(arg_dict['event'], 'stop_event_1')
    291                 self.assertEqual(arg_dict['extra'], 'payload 2')
    292                 self.assertTrue(isinstance(arg_dict['pid'], int))
    293                 self.assertFalse(self.notify_timeout)
    294                 thread.join(5)
    295                 self.assertFalse(thread.isAlive())
    296                 self.assertFalse(target.listening)
    297                 target.close()
     317                for two_payloads in False, True:
     318                    options = dict(
     319                        run_as_method=run_as_method,
     320                        call_notify=call_notify,
     321                        two_payloads=two_payloads)
     322                    if True in options.values():
     323                        self.test_notify(options)
    298324
    299325    def test_notify_timeout(self):
  • branches/4.x/module/pg.py

    r590 r625  
    226226        _ilist = [self.db.fileno()]
    227227
    228         while True:
     228        while self.listening:
    229229            ilist, _olist, _elist = select.select(_ilist, [], [], self.timeout)
    230             if ilist == []:  # we timed out
    231                 self.unlisten()
    232                 self.callback(None)
    233                 break
    234             else:
    235                 notice = self.db.getnotify()
    236                 if notice is None:
    237                     continue
    238                 event, pid, extra = notice
    239                 if event in (self.event, self.stop_event):
     230            if ilist:
     231                while self.listening:
     232                    notice = self.db.getnotify()
     233                    if not notice:  # no more messages
     234                        break
     235                    event, pid, extra = notice
     236                    if event not in (self.event, self.stop_event):
     237                        self.unlisten()
     238                        raise _db_error(
     239                            'listening for "%s" and "%s", but notified of "%s"'
     240                            % (self.event, self.stop_event, event))
     241                    if event == self.stop_event:
     242                        self.unlisten()
    240243                    self.arg_dict['pid'] = pid
    241244                    self.arg_dict['event'] = event
    242245                    self.arg_dict['extra'] = extra
    243246                    self.callback(self.arg_dict)
    244                     if event == self.stop_event:
    245                         self.unlisten()
    246                         break
    247                 else:
    248                     self.unlisten()
    249                     raise _db_error(
    250                         'listening for "%s" and "%s", but notified of "%s"'
    251                         % (self.event, self.stop_event, event))
     247            else:   # we timed out
     248                self.unlisten()
     249                self.callback(None)
    252250
    253251
Note: See TracChangeset for help on using the changeset viewer.