Changeset 547 for branches/4.x


Ignore:
Timestamp:
Nov 19, 2015, 9:19:43 AM (4 years ago)
Author:
cito
Message:

Add test for canceling a long-running query

File:
1 edited

Legend:

Unmodified
Added
Removed
  • branches/4.x/module/TEST_PyGreSQL_classic_connection.py

    r546 r547  
    1717    import unittest
    1818import sys
     19import threading
     20import time
    1921
    2022import pg  # the module under test
     
    170172            self.fail('Query should give an error for a closed connection')
    171173        self.connection = connect()
     174
     175    def testMethodReset(self):
     176        query = self.connection.query
     177        # check that client encoding gets reset
     178        encoding = query('show client_encoding').getresult()[0][0].upper()
     179        changed_encoding = 'LATIN1' if encoding == 'UTF8' else 'UTF8'
     180        self.assertNotEqual(encoding, changed_encoding)
     181        self.connection.query("set client_encoding=%s" % changed_encoding)
     182        new_encoding = query('show client_encoding').getresult()[0][0].upper()
     183        self.assertEqual(new_encoding, changed_encoding)
     184        self.connection.reset()
     185        new_encoding = query('show client_encoding').getresult()[0][0].upper()
     186        self.assertNotEqual(new_encoding, changed_encoding)
     187        self.assertEqual(new_encoding, encoding)
     188
     189    def testMethodCancel(self):
     190        r = self.connection.cancel()
     191        self.assertIsInstance(r, int)
     192        self.assertEqual(r, 1)
     193
     194    def testCancelLongRunningThread(self):
     195        errors = []
     196
     197        def sleep():
     198            try:
     199                self.connection.query('select pg_sleep(5)').getresult()
     200            except pg.ProgrammingError, error:
     201                errors.append(str(error))
     202
     203        thread = threading.Thread(target=sleep)
     204        t1 = time.time()
     205        thread.start()  # run the query
     206        while 1:  # make sure the query is really running
     207            time.sleep(0.1)
     208            if thread.is_alive() or time.time() - t1 > 5:
     209                break
     210        r = self.connection.cancel()  # cancel the running query
     211        thread.join()  # wait for the thread to end
     212        t2 = time.time()
     213
     214        self.assertIsInstance(r, int)
     215        self.assertEqual(r, 1)  # return code should be 1
     216        self.assertLessEqual(t2 - t1, 3)  # time should be under 3 seconds
     217        self.assertTrue(errors)
     218
     219    def testMethodFileNo(self):
     220        r = self.connection.fileno()
     221        self.assertIsInstance(r, int)
     222        self.assertGreaterEqual(r, 0)
     223
     224
    172225
    173226
     
    438491            '2|xyz  |uvw  \n'
    439492            '(2 rows)\n')
    440 
    441     def testGetNotify(self):
    442         self.assertIsNone(self.c.getnotify())
    443         self.c.query('listen test_notify')
    444         try:
    445             self.assertIsNone(self.c.getnotify())
    446             self.c.query("notify test_notify")
    447             r = self.c.getnotify()
    448             self.assertIsInstance(r, tuple)
    449             self.assertEqual(len(r), 3)
    450             self.assertIsInstance(r[0], str)
    451             self.assertIsInstance(r[1], int)
    452             self.assertIsInstance(r[2], str)
    453             self.assertEqual(r[0], 'test_notify')
    454             self.assertEqual(r[2], '')
    455             self.assertIsNone(self.c.getnotify())
    456             try:
    457                 self.c.query("notify test_notify, 'test_payload'")
    458             except pg.ProgrammingError:  # PostgreSQL < 9.0
    459                 pass
    460             else:
    461                 r = self.c.getnotify()
    462                 self.assertTrue(isinstance(r, tuple))
    463                 self.assertEqual(len(r), 3)
    464                 self.assertIsInstance(r[0], str)
    465                 self.assertIsInstance(r[1], int)
    466                 self.assertIsInstance(r[2], str)
    467                 self.assertEqual(r[0], 'test_notify')
    468                 self.assertEqual(r[2], 'test_payload')
    469                 self.assertIsNone(self.c.getnotify())
    470         finally:
    471             self.c.query('unlisten test_notify')
    472493
    473494
     
    727748
    728749class TestNoticeReceiver(unittest.TestCase):
    729     """"Test notice receiver support."""
    730 
    731     # Test database needed: must be run as a DBTestSuite.
     750    """"Test notification support."""
    732751
    733752    def setUp(self):
     
    736755    def tearDown(self):
    737756        self.c.close()
     757
     758    def testGetNotify(self):
     759        getnotify = self.connection.getnotify
     760        query = self.c.query
     761        self.assertIsNone(getnotify())
     762        query('listen test_notify')
     763        try:
     764            self.assertIsNone(self.c.getnotify())
     765            query("notify test_notify")
     766            r = getnotify()
     767            self.assertIsInstance(r, tuple)
     768            self.assertEqual(len(r), 3)
     769            self.assertIsInstance(r[0], str)
     770            self.assertIsInstance(r[1], int)
     771            self.assertIsInstance(r[2], str)
     772            self.assertEqual(r[0], 'test_notify')
     773            self.assertEqual(r[2], '')
     774            self.assertIsNone(self.c.getnotify())
     775            try:
     776                query("notify test_notify, 'test_payload'")
     777            except pg.ProgrammingError:  # PostgreSQL < 9.0
     778                pass
     779            else:
     780                r = getnotify()
     781                self.assertTrue(isinstance(r, tuple))
     782                self.assertEqual(len(r), 3)
     783                self.assertIsInstance(r[0], str)
     784                self.assertIsInstance(r[1], int)
     785                self.assertIsInstance(r[2], str)
     786                self.assertEqual(r[0], 'test_notify')
     787                self.assertEqual(r[2], 'test_payload')
     788                self.assertIsNone(getnotify())
     789        finally:
     790            query('unlisten test_notify')
    738791
    739792    def testGetNoticeReceiver(self):
Note: See TracChangeset for help on using the changeset viewer.