Changeset 768 for trunk


Ignore:
Timestamp:
Jan 19, 2016, 10:19:38 AM (4 years ago)
Author:
cito
Message:

Refactoring of DB wrapper test

The creation of temporary tables happened way too often,
so this was outsourced into a separate method.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_dbwrapper.py

    r765 r768  
    1111
    1212"""
    13 
    1413try:
    1514    import unittest2 as unittest  # for Python < 2.7
     
    315314        self.doCleanups()
    316315        self.db.close()
     316
     317    def createTable(self, table, definition,
     318            temporary=True, oids=None, values=None):
     319        query = self.db.query
     320        if not '"' in table or '.' in table:
     321            table = '"%s"' % table
     322        if not temporary:
     323            q = 'drop table if exists %s cascade' % table
     324            query(q)
     325            self.addCleanup(query, q)
     326        temporary = 'temporary table' if temporary else 'table'
     327        as_query = definition.startswith(('as ', 'AS '))
     328        if not as_query and not definition.startswith('('):
     329            definition = '(%s)' % definition
     330        with_oids = 'with oids' if oids else 'without oids'
     331        q = ['create', temporary, table]
     332        if as_query:
     333            q.extend([with_oids, definition])
     334        else:
     335            q.extend([definition, with_oids])
     336        q = ' '.join(q)
     337        query(q)
     338        if values:
     339            for params in values:
     340                if not isinstance(params, (list, tuple)):
     341                    params = [params]
     342                values = ', '.join('$%d' % (n + 1) for n in range(len(params)))
     343                q = "insert into %s values (%s)" % (table, values)
     344                query(q, params)
    317345
    318346    def testClassName(self):
     
    629657        self.assertEqual(r, default_datestyle)
    630658
     659    def testCreateTable(self):
     660        table = 'test hello world'
     661        values = [(2, "World!"), (1, "Hello")]
     662        self.createTable(table, "n smallint, t varchar",
     663            temporary=True, oids=True, values=values)
     664        r = self.db.query('select t from "%s" order by n' % table).getresult()
     665        r = ', '.join(row[0] for row in r)
     666        self.assertEqual(r, "Hello, World!")
     667        r = self.db.query('select oid from "%s" limit 1' % table).getresult()
     668        self.assertIsInstance(r[0][0], int)
     669
    631670    def testQuery(self):
    632671        query = self.db.query
    633         query("drop table if exists test_table")
    634         self.addCleanup(query, "drop table test_table")
    635         q = "create table test_table (n integer) with oids"
    636         r = query(q)
    637         self.assertIsNone(r)
     672        table = 'test_table'
     673        self.createTable(table, "n integer", oids=True)
    638674        q = "insert into test_table values (1)"
    639675        r = query(q)
     
    671707    def testQueryWithParams(self):
    672708        query = self.db.query
    673         query("drop table if exists test_table")
    674         self.addCleanup(query, "drop table test_table")
    675         q = "create table test_table (n1 integer, n2 integer) with oids"
    676         query(q)
     709        self.createTable('test_table', 'n1 integer, n2 integer', oids=True)
    677710        q = "insert into test_table values ($1, $2)"
    678711        r = query(q, (1, 2))
     
    711744        self.assertRaises(KeyError, pkey, 'test')
    712745        for t in ('pkeytest', 'primary key test'):
    713             for n in range(8):
    714                 query('drop table if exists "%s%d"' % (t, n))
    715                 self.addCleanup(query, 'drop table "%s%d"' % (t, n))
    716             query('create table "%s0" ('
    717                 "a smallint)" % t)
    718             query('create table "%s1" ('
    719                 "b smallint primary key)" % t)
    720             query('create table "%s2" ('
    721                 "c smallint, d smallint primary key)" % t)
    722             query('create table "%s3" ('
    723                 "e smallint, f smallint, g smallint,"
    724                 " h smallint, i smallint,"
    725                 " primary key (f, h))" % t)
    726             query('create table "%s4" ('
    727                 "e smallint, f smallint, g smallint,"
    728                 " h smallint, i smallint,"
    729                 " primary key (h, f))" % t)
    730             query('create table "%s5" ('
    731                 "more_than_one_letter varchar primary key)" % t)
    732             query('create table "%s6" ('
    733                 '"with space" date primary key)' % t)
    734             query('create table "%s7" ('
    735                 'a_very_long_column_name varchar,'
    736                 ' "with space" date,'
    737                 ' "42" int,'
    738                 " primary key (a_very_long_column_name,"
    739                 ' "with space", "42"))' % t)
     746            self.createTable('%s0' % t, 'a smallint')
     747            self.createTable('%s1' % t, 'b smallint primary key')
     748            self.createTable('%s2' % t,
     749                'c smallint, d smallint primary key')
     750            self.createTable('%s3' % t,
     751                'e smallint, f smallint, g smallint, h smallint, i smallint,'
     752                ' primary key (f, h)')
     753            self.createTable('%s4' % t,
     754                'e smallint, f smallint, g smallint, h smallint, i smallint,'
     755                ' primary key (h, f)')
     756            self.createTable('%s5' % t,
     757                'more_than_one_letter varchar primary key')
     758            self.createTable('%s6' % t,
     759                '"with space" date primary key')
     760            self.createTable('%s7' % t,
     761                'a_very_long_column_name varchar, "with space" date, "42" int,'
     762                ' primary key (a_very_long_column_name, "with space", "42")')
    740763            self.assertRaises(KeyError, pkey, '%s0' % t)
    741764            self.assertEqual(pkey('%s1' % t), 'b')
     
    780803    def testGetTables(self):
    781804        get_tables = self.db.get_tables
    782         result1 = get_tables()
    783         self.assertIsInstance(result1, list)
    784         for t in result1:
     805        tables = ('A very Special Name', 'A_MiXeD_quoted_NaMe',
     806            'Hello, Test World!', 'Zoro', 'a1', 'a2', 'a321',
     807            'averyveryveryveryveryveryveryreallyreallylongtablename',
     808            'b0', 'b3', 'x', 'xXx', 'xx', 'y', 'z')
     809        for t in tables:
     810            self.db.query('drop table if exists "%s" cascade' % t)
     811        before_tables = get_tables()
     812        self.assertIsInstance(before_tables, list)
     813        for t in before_tables:
    785814            t = t.split('.', 1)
    786815            self.assertGreaterEqual(len(t), 2)
     
    790819            self.assertNotEqual(t, 'information_schema')
    791820            self.assertFalse(t.startswith('pg_'))
    792         tables = ('"A very Special Name"',
    793             '"A_MiXeD_quoted_NaMe"', 'a1', 'a2',
    794             'A_MiXeD_NaMe', '"another special name"',
    795             'averyveryveryveryveryveryverylongtablename',
    796             'b0', 'b3', 'x', 'xx', 'xXx', 'y', 'z')
    797821        for t in tables:
    798             self.db.query('drop table if exists %s' % t)
    799             self.db.query("create table %s"
    800                 " as select 0" % t)
    801         result3 = get_tables()
    802         result2 = []
    803         for t in result3:
    804             if t not in result1:
    805                 result2.append(t)
    806         result3 = []
    807         for t in tables:
    808             if not t.startswith('"'):
    809                 t = t.lower()
    810             result3.append('public.' + t)
    811         self.assertEqual(result2, result3)
    812         for t in result2:
    813             self.db.query('drop table %s' % t)
    814         result2 = get_tables()
    815         self.assertEqual(result2, result1)
     822            self.createTable(t, 'as select 0', temporary=False)
     823        current_tables = get_tables()
     824        new_tables = [t for t in current_tables if t not in before_tables]
     825        expected_new_tables = ['public.%s' % (
     826            '"%s"' % t if ' ' in t or t != t.lower() else t) for t in tables]
     827        self.assertEqual(new_tables, expected_new_tables)
     828        self.doCleanups()
     829        after_tables = get_tables()
     830        self.assertEqual(after_tables, before_tables)
    816831
    817832    def testGetRelations(self):
     
    845860            f4='float', f8='float', m='money',
    846861            v4='text', c4='text', t='text'))
    847         query = self.db.query
    848         query("drop table if exists test_table")
    849         self.addCleanup(query, "drop table test_table")
    850         query("create table test_table("
    851             " n int, alpha smallint, beta bool,"
    852             " gamma char(5), tau text, v varchar(3))")
     862        self.createTable('test_table',
     863            'n int, alpha smallint, beta bool,'
     864            ' gamma char(5), tau text, v varchar(3)')
    853865        r = get_attnames('test_table')
    854866        self.assertIsInstance(r, dict)
     
    859871    def testGetAttnamesWithQuotes(self):
    860872        get_attnames = self.db.get_attnames
    861         query = self.db.query
    862873        table = 'test table for get_attnames()'
    863         query('drop table if exists "%s"' % table)
    864         self.addCleanup(query, 'drop table "%s"' % table)
    865         query('create table "%s"('
    866             '"Prime!" smallint,'
    867             ' "much space" integer, "Questions?" text)' % table)
     874        self.createTable(table,
     875            '"Prime!" smallint, "much space" integer, "Questions?" text')
    868876        r = get_attnames(table)
    869877        self.assertIsInstance(r, dict)
     
    871879            'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
    872880        table = 'yet another test table for get_attnames()'
    873         query('drop table if exists "%s"' % table)
    874         self.addCleanup(query, 'drop table "%s"' % table)
    875         self.db.query('create table "%s" ('
     881        self.createTable(table,
    876882            'a smallint, b integer, c bigint,'
    877883            ' e numeric, f float, f2 double precision, m money,'
     
    879885            ' Normal_NaMe smallint, "Special Name" smallint,'
    880886            ' t text, u char(2), v varchar(2),'
    881             ' primary key (y, u)) with oids' % table)
     887            ' primary key (y, u)', oids=True)
    882888        r = get_attnames(table)
    883889        self.assertIsInstance(r, dict)
     
    890896    def testGetAttnamesWithRegtypes(self):
    891897        get_attnames = self.db.get_attnames
    892         query = self.db.query
    893         query("drop table if exists test_table")
    894         self.addCleanup(query, "drop table test_table")
    895         query("create table test_table("
    896             " n int, alpha smallint, beta bool,"
    897             " gamma char(5), tau text, v varchar(3))")
     898        self.createTable('test_table',
     899            ' n int, alpha smallint, beta bool,'
     900            ' gamma char(5), tau text, v varchar(3)')
    898901        use_regtypes = self.db.use_regtypes
    899902        regtypes = use_regtypes()
     
    912915        get_attnames = self.db.get_attnames
    913916        query = self.db.query
    914         query("drop table if exists test_table")
    915         self.addCleanup(query, "drop table test_table")
    916         query("create table test_table(col int)")
     917        self.createTable('test_table', 'col int')
    917918        r = get_attnames("test_table")
    918919        self.assertIsInstance(r, dict)
     
    938939        get_attnames = self.db.get_attnames
    939940        query = self.db.query
    940         query("drop table if exists test_table")
    941         self.addCleanup(query, "drop table test_table")
    942         query("create table test_table("
    943             " n int, alpha smallint, v varchar(3),"
    944             " gamma char(5), tau text, beta bool)")
     941        self.createTable('test_table',
     942            ' n int, alpha smallint, v varchar(3),'
     943            ' gamma char(5), tau text, beta bool')
    945944        r = get_attnames("test_table")
    946945        self.assertIsInstance(r, OrderedDict)
     
    973972        self.assertRaises(TypeError, get)
    974973        self.assertRaises(TypeError, get, table)
    975         query('drop table if exists "%s"' % table)
    976         self.addCleanup(query, 'drop table "%s"' % table)
    977         query('create table "%s" ('
    978             "n integer, t text) without oids" % table)
    979         for n, t in enumerate('xyz'):
    980             query('insert into "%s" values('"%d, '%s')"
    981                 % (table, n + 1, t))
     974        self.createTable(table, 'n integer, t text',
     975            values=enumerate('xyz', start=1))
    982976        self.assertRaises(pg.ProgrammingError, get, table, 2)
    983977        r = get(table, 2, 'n')
     
    10311025        query = self.db.query
    10321026        table = 'get_with_oid_test_table'
    1033         query('drop table if exists "%s"' % table)
    1034         self.addCleanup(query, 'drop table "%s"' % table)
    1035         query('create table "%s" ('
    1036             "n integer, t text) with oids" % table)
    1037         for n, t in enumerate('xyz'):
    1038             query('insert into "%s" values('"%d, '%s')"
    1039                 % (table, n + 1, t))
     1027        self.createTable(table, 'n integer, t text', oids=True,
     1028            values=enumerate('xyz', start=1))
    10401029        self.assertRaises(pg.ProgrammingError, get, table, 2)
    10411030        self.assertRaises(KeyError, get, table, {}, 'oid')
     
    11001089        query = self.db.query
    11011090        table = 'get_test_table_1'
    1102         query('drop table if exists "%s"' % table)
    1103         self.addCleanup(query, 'drop table "%s"' % table)
    1104         query('create table "%s" ('
    1105             "n integer, t text, primary key (n))" % table)
    1106         for n, t in enumerate('abc'):
    1107             query('insert into "%s" values('
    1108                 "%d, '%s')" % (table, n + 1, t))
     1091        self.createTable(table, 'n integer primary key, t text',
     1092            values=enumerate('abc', start=1))
    11091093        self.assertEqual(get(table, 2)['t'], 'b')
    11101094        self.assertEqual(get(table, 1, 'n')['t'], 'a')
     
    11161100        self.assertEqual(get(table, ['c'], ['t'])['n'], 3)
    11171101        table = 'get_test_table_2'
    1118         query('drop table if exists "%s"' % table)
    1119         self.addCleanup(query, 'drop table "%s"' % table)
    1120         query('create table "%s" ('
    1121             "n integer, m integer, t text, primary key (n, m))" % table)
    1122         for n in range(3):
    1123             for m in range(2):
    1124                 t = chr(ord('a') + 2 * n + m)
    1125                 query('insert into "%s" values('
    1126                     "%d, %d, '%s')" % (table, n + 1, m + 1, t))
     1102        self.createTable(table,
     1103            'n integer, m integer, t text, primary key (n, m)',
     1104            values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     1105                for n in range(3) for m in range(2)])
    11271106        self.assertRaises(KeyError, get, table, 2)
    11281107        self.assertEqual(get(table, (1, 1))['t'], 'a')
     
    11421121        query = self.db.query
    11431122        table = 'test table for get()'
    1144         query('drop table if exists "%s"' % table)
    1145         self.addCleanup(query, 'drop table "%s"' % table)
    1146         query('create table "%s" ('
    1147             '"Prime!" smallint primary key,'
    1148             ' "much space" integer, "Questions?" text)' % table)
    1149         query('insert into "%s"'
    1150               " values(17, 1001, 'No!')" % table)
     1123        self.createTable(table, '"Prime!" smallint primary key,'
     1124            ' "much space" integer, "Questions?" text',
     1125            values=[(17, 1001, 'No!')])
    11511126        r = get(table, 17)
    11521127        self.assertIsInstance(r, dict)
     
    11661141        get = self.db.get
    11671142        query = self.db.query
    1168         query("drop table if exists test_students")
    1169         self.addCleanup(query, "drop table test_students")
    1170         query("create table test_students (firstname varchar primary key,"
    1171             " nickname varchar, grade char(2))")
    1172         query("insert into test_students values ("
    1173               "'D''Arcy', 'Darcey', 'A+')")
    1174         query("insert into test_students values ("
    1175               "'Sheldon', 'Moonpie', 'A+')")
    1176         query("insert into test_students values ("
    1177               "'Robert', 'Little Bobby Tables', 'D-')")
     1143        self.createTable('test_students',
     1144            'firstname varchar primary key, nickname varchar, grade char(2)',
     1145            values=[("D'Arcy", 'Darcey', 'A+'), ('Sheldon', 'Moonpie', 'A+'),
     1146                    ('Robert', 'Little Bobby Tables', 'D-')])
    11781147        r = get('test_students', 'Sheldon')
    11791148        self.assertEqual(r, dict(
     
    12081177        decimal = pg.get_decimal()
    12091178        table = 'insert_test_table'
    1210         query('drop table if exists "%s"' % table)
    1211         self.addCleanup(query, 'drop table "%s"' % table)
    1212         query('create table "%s" ('
    1213             "i2 smallint, i4 integer, i8 bigint,"
    1214             " d numeric, f4 real, f8 double precision, m money,"
    1215             " v4 varchar(4), c4 char(4), t text,"
    1216             " b boolean, ts timestamp) with oids" % table)
     1179        self.createTable(table,
     1180            'i2 smallint, i4 integer, i8 bigint,'
     1181            ' d numeric, f4 real, f8 double precision, m money,'
     1182            ' v4 varchar(4), c4 char(4), t text,'
     1183            ' b boolean, ts timestamp', oids=True)
    12171184        oid_table = 'oid(%s)' % table
    12181185        tests = [dict(i2=None, i4=None, i8=None),
     
    13091276        insert = self.db.insert
    13101277        query = self.db.query
    1311         query("drop table if exists test_table")
    1312         self.addCleanup(query, "drop table test_table")
    1313         query("create table test_table (n int) with oids")
     1278        self.createTable('test_table', 'n int', oids=True)
    13141279        r = insert('test_table', n=1)
    13151280        self.assertIsInstance(r, dict)
     
    13801345        query = self.db.query
    13811346        table = 'test table for insert()'
    1382         query('drop table if exists "%s"' % table)
    1383         self.addCleanup(query, 'drop table "%s"' % table)
    1384         query('create table "%s" ('
    1385             '"Prime!" smallint primary key,'
    1386             ' "much space" integer, "Questions?" text)' % table)
     1347        self.createTable(table, '"Prime!" smallint primary key,'
     1348            ' "much space" integer, "Questions?" text')
    13871349        r = {'Prime!': 11, 'much space': 2002, 'Questions?': 'What?'}
    13881350        r = insert(table, r)
     
    14041366            'test', i2=2, i4=4, i8=8)
    14051367        table = 'update_test_table'
    1406         query('drop table if exists "%s"' % table)
    1407         self.addCleanup(query, 'drop table "%s"' % table)
    1408         query('create table "%s" ('
    1409             "n integer, t text) with oids" % table)
    1410         for n, t in enumerate('xyz'):
    1411             query('insert into "%s" values('
    1412                 "%d, '%s')" % (table, n + 1, t))
     1368        self.createTable(table, 'n integer, t text', oids=True,
     1369            values=enumerate('xyz', start=1))
    14131370        self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
    14141371        r = self.db.get(table, 2, 'n')
     
    14241381        get = self.db.get
    14251382        query = self.db.query
    1426         query("drop table if exists test_table")
    1427         self.addCleanup(query, "drop table test_table")
    1428         query("create table test_table (n int) with oids")
    1429         query("insert into test_table values (1)")
     1383        self.createTable('test_table', 'n int', oids=True, values=[1])
    14301384        s = get('test_table', 1, 'n')
    14311385        self.assertIsInstance(s, dict)
     
    14941448        self.assertEqual(r, [(1, 3), (5, 9)])
    14951449
     1450    def testUpdateWithoutOid(self):
     1451        update = self.db.update
     1452        query = self.db.query
     1453        self.assertRaises(pg.ProgrammingError, update,
     1454            'test', i2=2, i4=4, i8=8)
     1455        table = 'update_test_table'
     1456        self.createTable(table, 'n integer primary key, t text', oids=False,
     1457            values=enumerate('xyz', start=1))
     1458        r = self.db.get(table, 2)
     1459        r['t'] = 'u'
     1460        s = update(table, r)
     1461        self.assertEqual(s, r)
     1462        q = 'select t from "%s" where n=2' % table
     1463        r = query(q).getresult()[0][0]
     1464        self.assertEqual(r, 'u')
     1465
    14961466    def testUpdateWithCompositeKey(self):
    14971467        update = self.db.update
    14981468        query = self.db.query
    14991469        table = 'update_test_table_1'
    1500         query('drop table if exists "%s"' % table)
    1501         self.addCleanup(query, 'drop table if exists "%s"' % table)
    1502         query('create table "%s" ('
    1503             "n integer, t text, primary key (n))" % table)
    1504         for n, t in enumerate('abc'):
    1505             query('insert into "%s" values('
    1506                 "%d, '%s')" % (table, n + 1, t))
     1470        self.createTable(table, 'n integer primary key, t text',
     1471            values=enumerate('abc', start=1))
    15071472        self.assertRaises(KeyError, update, table, dict(t='b'))
    15081473        s = dict(n=2, t='d')
     
    15261491        query('drop table "%s"' % table)
    15271492        table = 'update_test_table_2'
    1528         query('drop table if exists "%s"' % table)
    1529         query('create table "%s" ('
    1530             "n integer, m integer, t text, primary key (n, m))" % table)
    1531         for n in range(3):
    1532             for m in range(2):
    1533                 t = chr(ord('a') + 2 * n + m)
    1534                 query('insert into "%s" values('
    1535                     "%d, %d, '%s')" % (table, n + 1, m + 1, t))
     1493        self.createTable(table,
     1494            'n integer, m integer, t text, primary key (n, m)',
     1495            values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     1496                for n in range(3) for m in range(2)])
    15361497        self.assertRaises(KeyError, update, table, dict(n=2, t='b'))
    15371498        self.assertEqual(update(table,
     
    15451506        query = self.db.query
    15461507        table = 'test table for update()'
    1547         query('drop table if exists "%s"' % table)
    1548         self.addCleanup(query, 'drop table "%s"' % table)
    1549         query('create table "%s" ('
    1550             '"Prime!" smallint primary key,'
    1551             ' "much space" integer, "Questions?" text)' % table)
    1552         query('insert into "%s"'
    1553               " values(13, 3003, 'Why!')" % table)
     1508        self.createTable(table, '"Prime!" smallint primary key,'
     1509            ' "much space" integer, "Questions?" text',
     1510            values=[(13, 3003, 'Why!')])
    15541511        r = {'Prime!': 13, 'much space': 7007, 'Questions?': 'When?'}
    15551512        r = update(table, r)
     
    15711528            'test', i2=2, i4=4, i8=8)
    15721529        table = 'upsert_test_table'
    1573         query('drop table if exists "%s"' % table)
    1574         self.addCleanup(query, 'drop table "%s"' % table)
    1575         query('create table "%s" ('
    1576             "n integer primary key, t text) with oids" % table)
     1530        self.createTable(table, 'n integer primary key, t text', oids=True)
    15771531        s = dict(n=1, t='x')
    15781532        try:
     
    16441598        get = self.db.get
    16451599        query = self.db.query
    1646         query("drop table if exists test_table")
    1647         self.addCleanup(query, "drop table test_table")
    1648         query("create table test_table (n int) with oids")
    1649         query("insert into test_table values (1)")
     1600        self.createTable('test_table', 'n int', oids=True, values=[1])
    16501601        self.assertRaises(pg.ProgrammingError,
    16511602            upsert, 'test_table', dict(n=2))
     
    17261677        query = self.db.query
    17271678        table = 'upsert_test_table_2'
    1728         query('drop table if exists "%s"' % table)
    1729         self.addCleanup(query, 'drop table "%s"' % table)
    1730         query('create table "%s" ('
    1731             "n integer, m integer, t text, primary key (n, m))" % table)
     1679        self.createTable(table,
     1680            'n integer, m integer, t text, primary key (n, m)')
    17321681        s = dict(n=1, m=2, t='x')
    17331682        try:
     
    17951744        query = self.db.query
    17961745        table = 'test table for upsert()'
    1797         query('drop table if exists "%s"' % table)
    1798         self.addCleanup(query, 'drop table "%s"' % table)
    1799         query('create table "%s" ('
    1800             '"Prime!" smallint primary key,'
    1801             ' "much space" integer, "Questions?" text)' % table)
     1746        self.createTable(table, '"Prime!" smallint primary key,'
     1747            ' "much space" integer, "Questions?" text')
    18021748        s = {'Prime!': 31, 'much space': 9009, 'Questions?': 'Yes.'}
    18031749        try:
     
    18251771    def testClear(self):
    18261772        clear = self.db.clear
    1827         query = self.db.query
    18281773        f = False if pg.get_bool() else 'f'
    18291774        r = clear('test')
     
    18321777        self.assertEqual(r, result)
    18331778        table = 'clear_test_table'
    1834         query('drop table if exists "%s"' % table)
    1835         self.addCleanup(query, 'drop table "%s"' % table)
    1836         query('create table "%s" ('
    1837             "n integer, b boolean, d date, t text) with oids" % table)
     1779        self.createTable(table,
     1780            'n integer, f float, b boolean, d date, t text', oids=True)
    18381781        r = clear(table)
    1839         result = dict(n=0, b=f, d='', t='')
     1782        result = dict(n=0, f=0, b=f, d='', t='')
    18401783        self.assertEqual(r, result)
    1841         r['a'] = r['n'] = 1
     1784        r['a'] = r['f'] = r['n'] = 1
    18421785        r['d'] = r['t'] = 'x'
    18431786        r['b'] = 't'
    18441787        r['oid'] = long(1)
    18451788        r = clear(table, r)
    1846         result = dict(a=1, n=0, b=f, d='', t='', oid=long(1))
     1789        result = dict(a=1, n=0, f=0, b=f, d='', t='', oid=long(1))
    18471790        self.assertEqual(r, result)
    18481791
     
    18511794        query = self.db.query
    18521795        table = 'test table for clear()'
    1853         query('drop table if exists "%s"' % table)
    1854         self.addCleanup(query, 'drop table "%s"' % table)
    1855         query('create table "%s" ('
    1856             '"Prime!" smallint primary key,'
    1857             ' "much space" integer, "Questions?" text)' % table)
     1796        self.createTable(table, '"Prime!" smallint primary key,'
     1797            ' "much space" integer, "Questions?" text')
    18581798        r = clear(table)
    18591799        self.assertIsInstance(r, dict)
     
    18681808            'test', dict(i2=2, i4=4, i8=8))
    18691809        table = 'delete_test_table'
    1870         query('drop table if exists "%s"' % table)
    1871         self.addCleanup(query, 'drop table "%s"' % table)
    1872         query('create table "%s" ('
    1873             "n integer, t text) with oids" % table)
    1874         for n, t in enumerate('xyz'):
    1875             query('insert into "%s" values('
    1876                 "%d, '%s')" % (table, n + 1, t))
     1810        self.createTable(table, 'n integer, t text', oids=True,
     1811            values=enumerate('xyz', start=1))
    18771812        self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
    18781813        r = self.db.get(table, 1, 'n')
     
    19041839        get = self.db.get
    19051840        query = self.db.query
    1906         query("drop table if exists test_table")
    1907         self.addCleanup(query, "drop table test_table")
    1908         query("create table test_table (n int) with oids")
    1909         for i in range(6):
    1910             query("insert into test_table values (%d)" % (i + 1))
     1841        self.createTable('test_table', 'n int', oids=True, values=range(1, 7))
    19111842        r = dict(n=3)
    19121843        self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
     
    19961927        query = self.db.query
    19971928        table = 'delete_test_table_1'
    1998         query('drop table if exists "%s"' % table)
    1999         self.addCleanup(query, 'drop table "%s"' % table)
    2000         query('create table "%s" ('
    2001             "n integer, t text, primary key (n))" % table)
    2002         for n, t in enumerate('abc'):
    2003             query("insert into %s values("
    2004                 "%d, '%s')" % (table, n + 1, t))
     1929        self.createTable(table, 'n integer primary key, t text',
     1930            values=enumerate('abc', start=1))
    20051931        self.assertRaises(KeyError, self.db.delete, table, dict(t='b'))
    20061932        self.assertEqual(self.db.delete(table, dict(n=2)), 1)
     
    20111937        self.assertEqual(r, 'c')
    20121938        table = 'delete_test_table_2'
    2013         query('drop table if exists "%s"' % table)
    2014         self.addCleanup(query, 'drop table "%s"' % table)
    2015         query('create table "%s" ('
    2016             "n integer, m integer, t text, primary key (n, m))" % table)
    2017         for n in range(3):
    2018             for m in range(2):
    2019                 t = chr(ord('a') + 2 * n + m)
    2020                 query('insert into "%s" values('
    2021                     "%d, %d, '%s')" % (table, n + 1, m + 1, t))
     1939        self.createTable(table,
     1940            'n integer, m integer, t text, primary key (n, m)',
     1941            values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     1942                for n in range(3) for m in range(2)])
    20221943        self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
    20231944        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
     
    20381959        query = self.db.query
    20391960        table = 'test table for delete()'
    2040         query('drop table if exists "%s"' % table)
    2041         self.addCleanup(query, 'drop table "%s"' % table)
    2042         query('create table "%s" ('
    2043             '"Prime!" smallint primary key,'
    2044             ' "much space" integer, "Questions?" text)' % table)
    2045         query('insert into "%s"'
    2046               " values(19, 5005, 'Yes!')" % table)
     1961        self.createTable(table, '"Prime!" smallint primary key,'
     1962            ' "much space" integer, "Questions?" text',
     1963            values=[(19, 5005, 'Yes!')])
    20471964        r = {'Prime!': 17}
    20481965        r = delete(table, r)
     
    20591976        delete = self.db.delete
    20601977        query = self.db.query
    2061         query("drop table if exists test_child")
    2062         query("drop table if exists test_parent")
    2063         self.addCleanup(query, "drop table test_parent")
    2064         query("create table test_parent (n smallint primary key)")
    2065         self.addCleanup(query, "drop table test_child")
    2066         query("create table test_child ("
    2067             " n smallint primary key references test_parent (n))")
    2068         for n in range(3):
    2069             query("insert into test_parent (n) values (%d)" % n)
    2070             query("insert into test_child (n) values (%d)" % n)
     1978        self.createTable('test_parent',
     1979            'n smallint primary key', values=range(3))
     1980        self.createTable('test_child',
     1981            'n smallint primary key references test_parent', values=range(3))
    20711982        q = ("select (select count(*) from test_parent),"
    20721983            " (select count(*) from test_child)")
     
    21052016        self.assertRaises(TypeError, truncate, dict(test_table=None))
    21062017        query = self.db.query
    2107         query("drop table if exists test_table")
    2108         self.addCleanup(query, "drop table test_table")
    2109         query("create table test_table (n smallint)")
    2110         for i in range(3):
    2111             query("insert into test_table values (1)")
     2018        self.createTable('test_table', 'n smallint',
     2019            temporary=False, values=[1] * 3)
    21122020        q = "select count(*) from test_table"
    21132021        r = query(q).getresult()[0][0]
     
    21232031        r = query(q).getresult()[0][0]
    21242032        self.assertEqual(r, 0)
    2125         query("drop table if exists test_table_2")
    2126         self.addCleanup(query, "drop table test_table_2")
    2127         query('create table test_table_2 (n smallint)')
     2033        self.createTable('test_table_2', 'n smallint', temporary=True)
    21282034        for t in (list, tuple, set):
    21292035            for i in range(3):
     
    21422048        self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
    21432049        query = self.db.query
    2144         query("drop table if exists test_table")
    2145         self.addCleanup(query, "drop table test_table")
    2146         query("create table test_table (n serial, t text)")
     2050        self.createTable('test_table', 'n serial, t text')
    21472051        for n in range(3):
    21482052            query("insert into test_table (t) values ('test')")
     
    21692073        self.assertRaises(TypeError, truncate, 'test_table', cascade='invalid')
    21702074        query = self.db.query
    2171         query("drop table if exists test_child")
    2172         query("drop table if exists test_parent")
    2173         self.addCleanup(query, "drop table test_parent")
    2174         query("create table test_parent (n smallint primary key)")
    2175         self.addCleanup(query, "drop table test_child")
    2176         query("create table test_child ("
    2177             " n smallint primary key references test_parent (n))")
    2178         for n in range(3):
    2179             query("insert into test_parent (n) values (%d)" % n)
    2180             query("insert into test_child (n) values (%d)" % n)
     2075        self.createTable('test_parent', 'n smallint primary key',
     2076            values=range(3))
     2077        self.createTable('test_child',
     2078            'n smallint primary key references test_parent (n)',
     2079            values=range(3))
    21812080        q = ("select (select count(*) from test_parent),"
    21822081            " (select count(*) from test_child)")
     
    22122111        self.assertRaises(TypeError, truncate, 'test_table', only='invalid')
    22132112        query = self.db.query
    2214         query("drop table if exists test_child")
    2215         query("drop table if exists test_parent")
    2216         self.addCleanup(query, "drop table test_parent")
    2217         query("create table test_parent (n smallint)")
    2218         self.addCleanup(query, "drop table test_child")
    2219         query("create table test_child ("
    2220             " m smallint) inherits (test_parent)")
     2113        self.createTable('test_parent', 'n smallint')
     2114        self.createTable('test_child', 'm smallint) inherits (test_parent')
    22212115        for n in range(3):
    22222116            query("insert into test_parent (n) values (1)")
     
    22502144        self.assertRaises(ValueError, truncate, 'test_parent*', only=True)
    22512145        truncate('test_parent*', only=False)
    2252         query("drop table if exists test_parent_2")
    2253         self.addCleanup(query, "drop table test_parent_2")
    2254         query("create table test_parent_2 (n smallint)")
    2255         query("drop table if exists test_child_2")
    2256         self.addCleanup(query, "drop table test_child_2")
    2257         query("create table test_child_2 ("
    2258             " m smallint) inherits (test_parent_2)")
    2259         for n in range(3):
    2260             query("insert into test_parent (n) values (1)")
    2261             query("insert into test_child (n, m) values (2, 3)")
    2262             query("insert into test_parent_2 (n) values (1)")
    2263             query("insert into test_child_2 (n, m) values (2, 3)")
     2146        self.createTable('test_parent_2', 'n smallint')
     2147        self.createTable('test_child_2', 'm smallint) inherits (test_parent_2')
     2148        for t in '', '_2':
     2149            for n in range(3):
     2150                query("insert into test_parent%s (n) values (1)" % t)
     2151                query("insert into test_child%s (n, m) values (2, 3)" % t)
    22642152        q = ("select (select count(*) from test_parent),"
    22652153            " (select count(*) from test_child),"
     
    22822170        query = self.db.query
    22832171        table = "test table for truncate()"
    2284         query('drop table if exists "%s"' % table)
    2285         self.addCleanup(query, 'drop table "%s"' % table)
    2286         query('create table "%s" (n smallint)' % table)
    2287         for i in range(3):
    2288             query('insert into "%s" values (1)' % table)
     2172        self.createTable(table, 'n smallint', temporary=False, values=[1] * 3)
    22892173        q = 'select count(*) from "%s"' % table
    22902174        r = query(q).getresult()[0][0]
     
    23032187    def testTransaction(self):
    23042188        query = self.db.query
    2305         query("drop table if exists test_table")
    2306         self.addCleanup(query, "drop table test_table")
    2307         query("create table test_table (n integer)")
     2189        self.createTable('test_table', 'n integer', temporary=False)
    23082190        self.db.begin()
    23092191        query("insert into test_table values (1)")
     
    23492231    def testContextManager(self):
    23502232        query = self.db.query
    2351         query("drop table if exists test_table")
    2352         self.addCleanup(query, "drop table test_table")
    2353         query("create table test_table (n integer check(n>0))")
     2233        self.createTable('test_table', 'n integer check(n>0)')
    23542234        with self.db:
    23552235            query("insert into test_table values (1)")
     
    23782258    def testBytea(self):
    23792259        query = self.db.query
    2380         query('drop table if exists bytea_test')
    2381         self.addCleanup(query, 'drop table bytea_test')
    2382         query('create table bytea_test (n smallint primary key, data bytea)')
     2260        self.createTable('bytea_test', 'n smallint primary key, data bytea')
    23832261        s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
    23842262        r = self.db.escape_bytea(s)
     
    23972275    def testInsertUpdateGetBytea(self):
    23982276        query = self.db.query
    2399         query('drop table if exists bytea_test')
    2400         self.addCleanup(query, 'drop table bytea_test')
    2401         query('create table bytea_test (n smallint primary key, data bytea)')
     2277        self.createTable('bytea_test', 'n smallint primary key, data bytea')
    24022278        # insert null value
    24032279        r = self.db.insert('bytea_test', n=0, data=None)
     
    24592335    def testUpsertBytea(self):
    24602336        query = self.db.query
    2461         query('drop table if exists bytea_test')
    2462         self.addCleanup(query, 'drop table bytea_test')
    2463         query('create table bytea_test (n smallint primary key, data bytea)')
     2337        self.createTable('bytea_test', 'n smallint primary key, data bytea')
    24642338        s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
    24652339        r = dict(n=7, data=s)
Note: See TracChangeset for help on using the changeset viewer.