Changeset 486


Ignore:
Timestamp:
Jan 5, 2013, 10:19:07 AM (7 years ago)
Author:
darcy
Message:

Open database in every test to assure that there are no interactions.
Add tests for pgnotify.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/module/TEST_PyGreSQL_classic.py

    r464 r486  
    33from __future__ import with_statement
    44
     5import sys, thread, time
    56import unittest
    67from pg import *
     
    1718    pass
    1819
    19 db = DB(dbname, dbhost, dbport)
    20 db.query("SET DATESTYLE TO 'ISO'")
    21 db.query("SET TIME ZONE 'EST5EDT'")
    22 db.query("SET DEFAULT_WITH_OIDS=TRUE")
    23 db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
    24 
     20def opendb():
     21    db = DB(dbname, dbhost, dbport)
     22    db.query("SET DATESTYLE TO 'ISO'")
     23    db.query("SET TIME ZONE 'EST5EDT'")
     24    db.query("SET DEFAULT_WITH_OIDS=TRUE")
     25    db.query("SET STANDARD_CONFORMING_STRINGS=FALSE")
     26    return db
     27
     28def cb1(arg_dict):
     29    global cb1_return
     30    if arg_dict is None:
     31        cb1_return = 'timed out'
     32    else:
     33        cb1_return = arg_dict
    2534
    2635class UtilityTest(unittest.TestCase):
     
    2837    def setUp(self):
    2938        """Setup test tables or empty them if they already exist."""
     39        db = opendb()
     40
    3041        for t in ('_test1', '_test2'):
    3142            try:
     
    5162    def test_invalidname(self):
    5263        """Make sure that invalid table names are caught"""
     64        db = opendb()
    5365        self.failUnlessRaises(ProgrammingError, db.get_attnames, 'x.y.z')
    5466
    5567    def test_schema(self):
    5668        """Does it differentiate the same table name in different schemas"""
     69        db = opendb()
    5770        # see if they differentiate the table names properly
    5871        self.assertEqual(
     
    7487
    7588    def test_pkey(self):
     89        db = opendb()
    7690        self.assertEqual(db.pkey('_test_schema'), '_test')
    7791        self.assertEqual(db.pkey('public._test_schema'), '_test')
     
    8599
    86100    def test_get(self):
     101        db = opendb()
    87102        db.query("INSERT INTO _test_schema VALUES (1234)")
    88103        db.get('_test_schema', 1234)
     
    92107
    93108    def test_params(self):
     109        db = opendb()
    94110        db.query("INSERT INTO _test_schema VALUES ($1, $2, $3)", 12, None, 34)
    95111        d = db.get('_test_schema', 12)
     
    97113
    98114    def test_insert(self):
     115        db = opendb()
    99116        d = dict(_test=1234)
    100117        db.insert('_test_schema', d)
     
    104121
    105122    def test_context_manager(self):
     123        db = opendb()
    106124        t = '_test_schema'
    107125        d = dict(_test=1235)
     
    129147
    130148    def test_sqlstate(self):
     149        db = opendb()
    131150        db.query("INSERT INTO _test_schema VALUES (1234)")
    132151        try:
     
    139158
    140159    def test_mixed_case(self):
     160        db = opendb()
    141161        try:
    142162            db.query('CREATE TABLE _test_mc ("_Test" int PRIMARY KEY)')
     
    147167
    148168    def test_update(self):
     169        db = opendb()
    149170        db.query("INSERT INTO _test_schema VALUES (1234)")
    150171
     
    166187
    167188    def test_quote(self):
     189        db = opendb()
    168190        q = db._quote
    169191        self.assertEqual(q(0, 'int'), "0")
     
    205227        self.assertEqual(q("\\", 'text'), "'\\\\'")
    206228
     229    # note that notify can be created as part of the DB class or
     230    # independently.
     231
     232    def test_notify_DB(self):
     233        global cb1_return
     234       
     235        db = opendb()
     236        db2 = opendb()
     237        # Listen for 'event_1'
     238        pgn = db2.pgnotify('event_1', cb1)
     239        thread.start_new_thread(pgn, ())
     240        time.sleep(1)
     241        # Generate notification from the other connection.
     242        db.query('notify event_1')
     243        time.sleep(1)
     244        # Check that callback has been invoked.
     245        self.assertEquals(cb1_return['event'], 'event_1')
     246
     247    def test_notify_timeout_DB(self):
     248        db = opendb()
     249        db2 = opendb()
     250        global cb1_return
     251        # Listen for 'event_1'
     252        pgn = db2.pgnotify('event_1', cb1, {}, 1)
     253        thread.start_new_thread(pgn, ())
     254        # Sleep long enough to time out.
     255        time.sleep(2)
     256        # Verify that we've indeed timed out.
     257        self.assertEquals(cb1_return, 'timed out')
     258
     259    def test_notify(self):
     260        db = opendb()
     261        db2 = opendb()
     262        global cb1_return
     263        # Listen for 'event_1'
     264        pgn = pgnotify(db2, 'event_1', cb1)
     265        thread.start_new_thread(pgn, ())
     266        time.sleep(1)
     267        # Generate notification from the other connection.
     268        db.query('notify event_1')
     269        time.sleep(1)
     270        # Check that callback has been invoked.
     271        self.assertEquals(cb1_return['event'], 'event_1')
     272
     273    def test_notify_timeout(self):
     274        db = opendb()
     275        db2 = opendb()
     276        global cb1_return
     277        # Listen for 'event_1'
     278        pgn = pgnotify(db2, 'event_1', cb1, {}, 1)
     279        thread.start_new_thread(pgn, ())
     280        # Sleep long enough to time out.
     281        time.sleep(2)
     282        # Verify that we've indeed timed out.
     283        self.assertEquals(cb1_return, 'timed out')
    207284
    208285if __name__ == '__main__':
    209     unittest.main()
     286    suite = unittest.TestSuite()
     287   
     288    if len(sys.argv) > 1: test_list = sys.argv[1:]
     289    else: test_list = unittest.getTestCaseNames(UtilityTest, 'test_')
     290
     291    if len(sys.argv) == 2 and sys.argv[1] == '-l':
     292        print '\n'.join(unittest.getTestCaseNames(UtilityTest, 'test_'))
     293        sys.exit(1)
     294
     295    for test_name in test_list:
     296        try:
     297            suite.addTest(UtilityTest(test_name))
     298        except:
     299            print "\n ERROR: %s.\n" % sys.exc_value
     300            sys.exit(1)
     301
     302    rc = unittest.TextTestRunner(verbosity=1).run(suite)
     303    sys.exit(len(rc.errors+rc.failures) != 0)
     304
Note: See TracChangeset for help on using the changeset viewer.