Changeset 765 for trunk/tests


Ignore:
Timestamp:
Jan 18, 2016, 6:21:44 PM (4 years ago)
Author:
cito
Message:

Improve support for access by primary key

Composite primary keys are now returned as tuples instead of frozensets,
where the ordering of the tuple reflects the primary key index.

Primary keys now takes precedence if both OID and primary key are available
(this was solved the other way around in 4.x). Use of OIDs is thus slightly
more discouraged, though it still works as before for tables with OIDs where
no primary key is available.

This changeset also clarifies some docstrings, makes the code a bit clearer,
handles and tests some more edge cases (pg module still has 100% coverage).

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_dbwrapper.py

    r763 r765  
    709709        query = self.db.query
    710710        pkey = self.db.pkey
     711        self.assertRaises(KeyError, pkey, 'test')
    711712        for t in ('pkeytest', 'primary key test'):
    712             for n in range(7):
     713            for n in range(8):
    713714                query('drop table if exists "%s%d"' % (t, n))
    714715                self.addCleanup(query, 'drop table "%s%d"' % (t, n))
     
    724725                " primary key (f, h))" % t)
    725726            query('create table "%s4" ('
     727                "e smallint, f smallint, g smallint,"
     728                " h smallint, i smallint,"
     729                " primary key (h, f))" % t)
     730            query('create table "%s5" ('
    726731                "more_than_one_letter varchar primary key)" % t)
    727             query('create table "%s5" ('
     732            query('create table "%s6" ('
    728733                '"with space" date primary key)' % t)
    729             query('create table "%s6" ('
     734            query('create table "%s7" ('
    730735                'a_very_long_column_name varchar,'
    731736                ' "with space" date,'
     
    735740            self.assertRaises(KeyError, pkey, '%s0' % t)
    736741            self.assertEqual(pkey('%s1' % t), 'b')
     742            self.assertEqual(pkey('%s1' % t, True), ('b',))
     743            self.assertEqual(pkey('%s1' % t, composite=False), 'b')
     744            self.assertEqual(pkey('%s1' % t, composite=True), ('b',))
    737745            self.assertEqual(pkey('%s2' % t), 'd')
     746            self.assertEqual(pkey('%s2' % t, composite=True), ('d',))
    738747            r = pkey('%s3' % t)
    739             self.assertIsInstance(r, frozenset)
    740             self.assertEqual(r, frozenset('fh'))
    741             self.assertEqual(pkey('%s4' % t), 'more_than_one_letter')
    742             self.assertEqual(pkey('%s5' % t), 'with space')
    743             r = pkey('%s6' % t)
    744             self.assertIsInstance(r, frozenset)
    745             self.assertEqual(r, frozenset([
    746                 'a_very_long_column_name', 'with space', '42']))
     748            self.assertIsInstance(r, tuple)
     749            self.assertEqual(r, ('f', 'h'))
     750            r = pkey('%s3' % t, composite=False)
     751            self.assertIsInstance(r, tuple)
     752            self.assertEqual(r, ('f', 'h'))
     753            r = pkey('%s4' % t)
     754            self.assertIsInstance(r, tuple)
     755            self.assertEqual(r, ('h', 'f'))
     756            self.assertEqual(pkey('%s5' % t), 'more_than_one_letter')
     757            self.assertEqual(pkey('%s6' % t), 'with space')
     758            r = pkey('%s7' % t)
     759            self.assertIsInstance(r, tuple)
     760            self.assertEqual(r, (
     761                'a_very_long_column_name', 'with space', '42'))
    747762            # a newly added primary key will be detected
    748763            query('alter table "%s0" add primary key (a)' % t)
     
    956971        query = self.db.query
    957972        table = 'get_test_table'
     973        self.assertRaises(TypeError, get)
     974        self.assertRaises(TypeError, get, table)
     975        query('drop table if exists "%s"' % table)
     976        self.addCleanup(query, 'drop table "%s"' % table)
     977        query('create table "%s" ('
     978            "n integer, t text) without oids" % table)
     979        for n, t in enumerate('xyz'):
     980            query('insert into "%s" values('"%d, '%s')"
     981                % (table, n + 1, t))
     982        self.assertRaises(pg.ProgrammingError, get, table, 2)
     983        r = get(table, 2, 'n')
     984        self.assertIsInstance(r, dict)
     985        self.assertEqual(r, dict(n=2, t='y'))
     986        r = get(table, 1, 'n')
     987        self.assertEqual(r, dict(n=1, t='x'))
     988        r = get(table, (3,), ('n',))
     989        self.assertEqual(r, dict(n=3, t='z'))
     990        r = get(table, 'y', 't')
     991        self.assertEqual(r, dict(n=2, t='y'))
     992        self.assertRaises(pg.DatabaseError, get, table, 4)
     993        self.assertRaises(pg.DatabaseError, get, table, 4, 'n')
     994        self.assertRaises(pg.DatabaseError, get, table, 'y')
     995        self.assertRaises(pg.DatabaseError, get, table, 2, 't')
     996        s = dict(n=3)
     997        self.assertRaises(pg.ProgrammingError, get, table, s)
     998        r = get(table, s, 'n')
     999        self.assertIs(r, s)
     1000        self.assertEqual(r, dict(n=3, t='z'))
     1001        s.update(t='x')
     1002        r = get(table, s, 't')
     1003        self.assertIs(r, s)
     1004        self.assertEqual(s, dict(n=1, t='x'))
     1005        r = get(table, s, ('n', 't'))
     1006        self.assertIs(r, s)
     1007        self.assertEqual(r, dict(n=1, t='x'))
     1008        query('alter table "%s" alter n set not null' % table)
     1009        query('alter table "%s" add primary key (n)' % table)
     1010        r = get(table, 2)
     1011        self.assertIsInstance(r, dict)
     1012        self.assertEqual(r, dict(n=2, t='y'))
     1013        self.assertEqual(get(table, 1)['t'], 'x')
     1014        self.assertEqual(get(table, 3)['t'], 'z')
     1015        self.assertEqual(get(table + '*', 2)['t'], 'y')
     1016        self.assertEqual(get(table + ' *', 2)['t'], 'y')
     1017        self.assertRaises(KeyError, get, table, (2, 2))
     1018        s = dict(n=3)
     1019        r = get(table, s)
     1020        self.assertIs(r, s)
     1021        self.assertEqual(r, dict(n=3, t='z'))
     1022        s.update(n=1)
     1023        self.assertEqual(get(table, s)['t'], 'x')
     1024        s.update(n=2)
     1025        self.assertEqual(get(table, r)['t'], 'y')
     1026        s.pop('n')
     1027        self.assertRaises(KeyError, get, table, s)
     1028
     1029    def testGetWithOid(self):
     1030        get = self.db.get
     1031        query = self.db.query
     1032        table = 'get_with_oid_test_table'
    9581033        query('drop table if exists "%s"' % table)
    9591034        self.addCleanup(query, 'drop table "%s"' % table)
     
    9641039                % (table, n + 1, t))
    9651040        self.assertRaises(pg.ProgrammingError, get, table, 2)
    966         self.assertRaises(pg.ProgrammingError, get, table, {}, 'oid')
     1041        self.assertRaises(KeyError, get, table, {}, 'oid')
    9671042        r = get(table, 2, 'n')
    968         oid_table = 'oid(%s)' % table
    969         self.assertIn(oid_table, r)
    970         oid = r[oid_table]
     1043        qoid = 'oid(%s)' % table
     1044        self.assertIn(qoid, r)
     1045        oid = r[qoid]
    9711046        self.assertIsInstance(oid, int)
    972         result = {'t': 'y', 'n': 2, oid_table: oid}
     1047        result = {'t': 'y', 'n': 2, qoid: oid}
    9731048        self.assertEqual(r, result)
     1049        r = get(table, oid, 'oid')
     1050        self.assertEqual(r, result)
     1051        r = get(table, dict(oid=oid))
     1052        self.assertEqual(r, result)
     1053        r = get(table, dict(oid=oid), 'oid')
     1054        self.assertEqual(r, result)
     1055        r = get(table, {qoid: oid})
     1056        self.assertEqual(r, result)
     1057        r = get(table, {qoid: oid}, 'oid')
     1058        self.assertEqual(r, result)
     1059        self.assertEqual(get(table + '*', 2, 'n'), r)
    9741060        self.assertEqual(get(table + ' *', 2, 'n'), r)
    9751061        self.assertEqual(get(table, oid, 'oid')['t'], 'y')
     
    9811067        self.assertEqual(get(table, r, 'n')['t'], 'z')
    9821068        self.assertEqual(get(table, 1, 'n')['t'], 'x')
     1069        self.assertEqual(get(table, r, 'oid')['t'], 'z')
    9831070        query('alter table "%s" alter n set not null' % table)
    9841071        query('alter table "%s" add primary key (n)' % table)
     
    9921079        r['n'] = 2
    9931080        self.assertEqual(get(table, r)['t'], 'y')
     1081        r = get(table, oid, 'oid')
     1082        self.assertEqual(r, result)
     1083        r = get(table, dict(oid=oid))
     1084        self.assertEqual(r, result)
     1085        r = get(table, dict(oid=oid), 'oid')
     1086        self.assertEqual(r, result)
     1087        r = get(table, {qoid: oid})
     1088        self.assertEqual(r, result)
     1089        r = get(table, {qoid: oid}, 'oid')
     1090        self.assertEqual(r, result)
     1091        r = get(table, dict(oid=oid, n=1))
     1092        self.assertEqual(r['n'], 1)
     1093        self.assertNotEqual(r[qoid], oid)
     1094        r = get(table, dict(oid=oid, t='z'), 't')
     1095        self.assertEqual(r['n'], 3)
     1096        self.assertNotEqual(r[qoid], oid)
    9941097
    9951098    def testGetWithCompositeKey(self):
     
    10051108                "%d, '%s')" % (table, n + 1, t))
    10061109        self.assertEqual(get(table, 2)['t'], 'b')
     1110        self.assertEqual(get(table, 1, 'n')['t'], 'a')
     1111        self.assertEqual(get(table, 2, ('n',))['t'], 'b')
     1112        self.assertEqual(get(table, 3, ['n'])['t'], 'c')
     1113        self.assertEqual(get(table, (2,), ('n',))['t'], 'b')
     1114        self.assertEqual(get(table, 'b', 't')['n'], 2)
     1115        self.assertEqual(get(table, ('a',), ('t',))['n'], 1)
     1116        self.assertEqual(get(table, ['c'], ['t'])['n'], 3)
    10071117        table = 'get_test_table_2'
    10081118        query('drop table if exists "%s"' % table)
     
    10151125                query('insert into "%s" values('
    10161126                    "%d, %d, '%s')" % (table, n + 1, m + 1, t))
    1017         self.assertRaises(pg.ProgrammingError, get, table, 2)
     1127        self.assertRaises(KeyError, get, table, 2)
     1128        self.assertEqual(get(table, (1, 1))['t'], 'a')
     1129        self.assertEqual(get(table, (1, 2))['t'], 'b')
     1130        self.assertEqual(get(table, (2, 1))['t'], 'c')
     1131        self.assertEqual(get(table, (1, 2), ('n', 'm'))['t'], 'b')
     1132        self.assertEqual(get(table, (1, 2), ('m', 'n'))['t'], 'c')
     1133        self.assertEqual(get(table, (3, 1), ('n', 'm'))['t'], 'e')
     1134        self.assertEqual(get(table, (1, 3), ('m', 'n'))['t'], 'e')
    10181135        self.assertEqual(get(table, dict(n=2, m=2))['t'], 'd')
    1019         r = get(table, dict(n=1, m=2), ('n', 'm'))
    1020         self.assertEqual(r['t'], 'b')
    1021         r = get(table, dict(n=3, m=2), frozenset(['n', 'm']))
    1022         self.assertEqual(r['t'], 'f')
     1136        self.assertEqual(get(table, dict(n=1, m=2), ('n', 'm'))['t'], 'b')
     1137        self.assertEqual(get(table, dict(n=2, m=1), ['n', 'm'])['t'], 'c')
     1138        self.assertEqual(get(table, dict(n=3, m=2), ('m', 'n'))['t'], 'f')
    10231139
    10241140    def testGetWithQuotedNames(self):
     
    11991315        self.assertIsInstance(r, dict)
    12001316        self.assertEqual(r['n'], 1)
     1317        self.assertNotIn('oid', r)
    12011318        qoid = 'oid(test_table)'
    12021319        self.assertIn(qoid, r)
    1203         r = insert('test_table', n=2, oid='invalid')
     1320        oid = r[qoid]
     1321        self.assertEqual(sorted(r.keys()), ['n', qoid])
     1322        r = insert('test_table', n=2, oid=oid)
    12041323        self.assertIsInstance(r, dict)
    12051324        self.assertEqual(r['n'], 2)
    1206         r['n'] = 3
     1325        self.assertIn(qoid, r)
     1326        self.assertNotEqual(r[qoid], oid)
     1327        self.assertNotIn('oid', r)
     1328        r = insert('test_table', None, n=3)
     1329        self.assertIsInstance(r, dict)
     1330        self.assertEqual(r['n'], 3)
     1331        s = r
     1332        r = insert('test_table', r)
     1333        self.assertIs(r, s)
     1334        self.assertEqual(r['n'], 3)
     1335        r = insert('test_table *', r)
     1336        self.assertIs(r, s)
     1337        self.assertEqual(r['n'], 3)
     1338        r = insert('test_table', r, n=4)
     1339        self.assertIs(r, s)
     1340        self.assertEqual(r['n'], 4)
     1341        self.assertNotIn('oid', r)
     1342        self.assertIn(qoid, r)
     1343        oid = r[qoid]
     1344        r = insert('test_table', r, n=5, oid=oid)
     1345        self.assertIs(r, s)
     1346        self.assertEqual(r['n'], 5)
     1347        self.assertIn(qoid, r)
     1348        self.assertNotEqual(r[qoid], oid)
     1349        self.assertNotIn('oid', r)
     1350        r['oid'] = oid = r[qoid]
     1351        r = insert('test_table', r, n=6)
     1352        self.assertIs(r, s)
     1353        self.assertEqual(r['n'], 6)
     1354        self.assertIn(qoid, r)
     1355        self.assertNotEqual(r[qoid], oid)
     1356        self.assertNotIn('oid', r)
     1357        q = 'select n from test_table order by 1 limit 9'
     1358        r = ' '.join(str(row[0]) for row in query(q).getresult())
     1359        self.assertEqual(r, '1 2 3 3 3 4 5 6')
     1360        query("truncate test_table")
     1361        query("alter table test_table add unique (n)")
     1362        r = insert('test_table', dict(n=7))
     1363        self.assertIsInstance(r, dict)
     1364        self.assertEqual(r['n'], 7)
     1365        self.assertRaises(pg.ProgrammingError, insert, 'test_table', r)
     1366        r['n'] = 6
     1367        self.assertRaises(pg.ProgrammingError, insert, 'test_table', r, n=7)
     1368        self.assertIsInstance(r, dict)
     1369        self.assertEqual(r['n'], 7)
     1370        r['n'] = 6
    12071371        r = insert('test_table', r)
    12081372        self.assertIsInstance(r, dict)
    1209         self.assertEqual(r['n'], 3)
    1210         r = insert('test_table', r, n=4)
    1211         self.assertIsInstance(r, dict)
    1212         self.assertEqual(r['n'], 4)
    1213         q = 'select n from test_table order by 1 limit 5'
     1373        self.assertEqual(r['n'], 6)
    12141374        r = query(q).getresult()
    1215         self.assertEqual(r, [(1,), (2,), (3,), (4,)])
     1375        r = ' '.join(str(row[0]) for row in query(q).getresult())
     1376        self.assertEqual(r, '6 7')
    12161377
    12171378    def testInsertWithQuotedNames(self):
     
    12671428        query("create table test_table (n int) with oids")
    12681429        query("insert into test_table values (1)")
    1269         r = get('test_table', 1, 'n')
    1270         self.assertIsInstance(r, dict)
    1271         self.assertEqual(r['n'], 1)
    1272         r['n'] = 2
    1273         r = update('test_table', r)
    1274         self.assertIsInstance(r, dict)
     1430        s = get('test_table', 1, 'n')
     1431        self.assertIsInstance(s, dict)
     1432        self.assertEqual(s['n'], 1)
     1433        s['n'] = 2
     1434        r = update('test_table', s)
     1435        self.assertIs(r, s)
    12751436        self.assertEqual(r['n'], 2)
    12761437        qoid = 'oid(test_table)'
    12771438        self.assertIn(qoid, r)
     1439        self.assertNotIn('oid', r)
     1440        self.assertEqual(sorted(r.keys()), ['n', qoid])
    12781441        r['n'] = 3
    1279         r = update('test_table', r, oid=r.pop(qoid))
    1280         self.assertIsInstance(r, dict)
     1442        oid = r.pop(qoid)
     1443        r = update('test_table', r, oid=oid)
     1444        self.assertIs(r, s)
    12811445        self.assertEqual(r['n'], 3)
    12821446        r.pop(qoid)
    12831447        self.assertRaises(pg.ProgrammingError, update, 'test_table', r)
    1284         r = get('test_table', 3, 'n')
    1285         self.assertIsInstance(r, dict)
    1286         self.assertEqual(r['n'], 3)
    1287         r.pop('n')
    1288         r = update('test_table', r)
    1289         r.pop(qoid)
     1448        s = get('test_table', 3, 'n')
     1449        self.assertIsInstance(s, dict)
     1450        self.assertEqual(s['n'], 3)
     1451        s.pop('n')
     1452        r = update('test_table', s)
     1453        oid = r.pop(qoid)
    12901454        self.assertEqual(r, {})
    1291         q = 'select n from test_table limit 2'
     1455        q = "select n from test_table limit 2"
    12921456        r = query(q).getresult()
    12931457        self.assertEqual(r, [(3,)])
     1458        query("insert into test_table values (1)")
     1459        self.assertRaises(pg.ProgrammingError,
     1460            update, 'test_table', dict(oid=oid, n=4))
     1461        r = update('test_table', dict(n=4), oid=oid)
     1462        self.assertEqual(r['n'], 4)
     1463        r = update('test_table *', dict(n=5), oid=oid)
     1464        self.assertEqual(r['n'], 5)
     1465        query("alter table test_table add column m int")
     1466        query("alter table test_table add primary key (n)")
     1467        self.assertIn('m', self.db.get_attnames('test_table', flush=True))
     1468        self.assertEqual('n', self.db.pkey('test_table', flush=True))
     1469        s = dict(n=1, m=4)
     1470        r = update('test_table', s)
     1471        self.assertIs(r, s)
     1472        self.assertEqual(r['n'], 1)
     1473        self.assertEqual(r['m'], 4)
     1474        s = dict(m=7)
     1475        r = update('test_table', s, n=5)
     1476        self.assertIs(r, s)
     1477        self.assertEqual(r['n'], 5)
     1478        self.assertEqual(r['m'], 7)
     1479        q = "select n, m from test_table order by 1 limit 3"
     1480        r = query(q).getresult()
     1481        self.assertEqual(r, [(1, 4), (5, 7)])
     1482        s = dict(m=9, oid=oid)
     1483        self.assertRaises(KeyError, update, 'test_table', s)
     1484        r = update('test_table', s, oid=oid)
     1485        self.assertIs(r, s)
     1486        self.assertEqual(r['n'], 5)
     1487        self.assertEqual(r['m'], 9)
     1488        s = dict(n=1, m=3, oid=oid)
     1489        r = update('test_table', s)
     1490        self.assertIs(r, s)
     1491        self.assertEqual(r['n'], 1)
     1492        self.assertEqual(r['m'], 3)
     1493        r = query(q).getresult()
     1494        self.assertEqual(r, [(1, 3), (5, 9)])
    12941495
    12951496    def testUpdateWithCompositeKey(self):
     
    13041505            query('insert into "%s" values('
    13051506                "%d, '%s')" % (table, n + 1, t))
    1306         self.assertRaises(pg.ProgrammingError, update,
    1307                           table, dict(t='b'))
     1507        self.assertRaises(KeyError, update, table, dict(t='b'))
    13081508        s = dict(n=2, t='d')
    13091509        r = update(table, s)
     
    13341534                query('insert into "%s" values('
    13351535                    "%d, %d, '%s')" % (table, n + 1, m + 1, t))
    1336         self.assertRaises(pg.ProgrammingError, update,
    1337                           table, dict(n=2, t='b'))
     1536        self.assertRaises(KeyError, update, table, dict(n=2, t='b'))
    13381537        self.assertEqual(update(table,
    1339                                 dict(n=2, m=2, t='x'))['t'], 'x')
     1538            dict(n=2, m=2, t='x'))['t'], 'x')
    13401539        q = 'select t from "%s" where n=2 order by m' % table
    13411540        r = [r[0] for r in query(q).getresult()]
     
    14401639        r = upsert(table, s, oid='invalid')
    14411640        self.assertIs(r, s)
     1641
     1642    def testUpsertWithOid(self):
     1643        upsert = self.db.upsert
     1644        get = self.db.get
     1645        query = self.db.query
     1646        query("drop table if exists test_table")
     1647        self.addCleanup(query, "drop table test_table")
     1648        query("create table test_table (n int) with oids")
     1649        query("insert into test_table values (1)")
     1650        self.assertRaises(pg.ProgrammingError,
     1651            upsert, 'test_table', dict(n=2))
     1652        r = get('test_table', 1, 'n')
     1653        self.assertIsInstance(r, dict)
     1654        self.assertEqual(r['n'], 1)
     1655        qoid = 'oid(test_table)'
     1656        self.assertIn(qoid, r)
     1657        self.assertNotIn('oid', r)
     1658        oid = r[qoid]
     1659        self.assertRaises(pg.ProgrammingError,
     1660            upsert, 'test_table', dict(n=2, oid=oid))
     1661        query("alter table test_table add column m int")
     1662        query("alter table test_table add primary key (n)")
     1663        self.assertIn('m', self.db.get_attnames('test_table', flush=True))
     1664        self.assertEqual('n', self.db.pkey('test_table', flush=True))
     1665        s = dict(n=2)
     1666        r = upsert('test_table', s)
     1667        self.assertIs(r, s)
     1668        self.assertEqual(r['n'], 2)
     1669        self.assertIsNone(r['m'])
     1670        q = query("select n, m from test_table order by n limit 3")
     1671        self.assertEqual(q.getresult(), [(1, None), (2, None)])
     1672        r['oid'] = oid
     1673        r = upsert('test_table', r)
     1674        self.assertIs(r, s)
     1675        self.assertEqual(r['n'], 2)
     1676        self.assertIsNone(r['m'])
     1677        self.assertIn(qoid, r)
     1678        self.assertNotIn('oid', r)
     1679        self.assertNotEqual(r[qoid], oid)
     1680        r['m'] = 7
     1681        r = upsert('test_table', r)
     1682        self.assertIs(r, s)
     1683        self.assertEqual(r['n'], 2)
     1684        self.assertEqual(r['m'], 7)
     1685        r.update(n=1, m=3)
     1686        r = upsert('test_table', r)
     1687        self.assertIs(r, s)
     1688        self.assertEqual(r['n'], 1)
     1689        self.assertEqual(r['m'], 3)
     1690        q = query("select n, m from test_table order by n limit 3")
     1691        self.assertEqual(q.getresult(), [(1, 3), (2, 7)])
     1692        r = upsert('test_table', r, oid='invalid')
     1693        self.assertIs(r, s)
     1694        self.assertEqual(r['n'], 1)
     1695        self.assertEqual(r['m'], 3)
     1696        r['m'] = 5
     1697        r = upsert('test_table', r, m=False)
     1698        self.assertIs(r, s)
     1699        self.assertEqual(r['n'], 1)
     1700        self.assertEqual(r['m'], 3)
     1701        r['m'] = 5
     1702        r = upsert('test_table', r, m=True)
     1703        self.assertIs(r, s)
     1704        self.assertEqual(r['n'], 1)
     1705        self.assertEqual(r['m'], 5)
     1706        r.update(n=2, m=1)
     1707        r = upsert('test_table', r, m='included.m')
     1708        self.assertIs(r, s)
     1709        self.assertEqual(r['n'], 2)
     1710        self.assertEqual(r['m'], 7)
     1711        r['m'] = 9
     1712        r = upsert('test_table', r, m='excluded.m')
     1713        self.assertIs(r, s)
     1714        self.assertEqual(r['n'], 2)
     1715        self.assertEqual(r['m'], 9)
     1716        r['m'] = 8
     1717        r = upsert('test_table *', r, m='included.m + 1')
     1718        self.assertIs(r, s)
     1719        self.assertEqual(r['n'], 2)
     1720        self.assertEqual(r['m'], 10)
     1721        q = query("select n, m from test_table order by n limit 3")
     1722        self.assertEqual(q.getresult(), [(1, 5), (2, 10)])
    14421723
    14431724    def testUpsertWithCompositeKey(self):
     
    16261907        self.addCleanup(query, "drop table test_table")
    16271908        query("create table test_table (n int) with oids")
    1628         query("insert into test_table values (1)")
    1629         query("insert into test_table values (2)")
    1630         query("insert into test_table values (3)")
     1909        for i in range(6):
     1910            query("insert into test_table values (%d)" % (i + 1))
    16311911        r = dict(n=3)
    16321912        self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
    1633         r = get('test_table', 1, 'n')
    1634         self.assertIsInstance(r, dict)
    1635         self.assertEqual(r['n'], 1)
     1913        s = get('test_table', 1, 'n')
    16361914        qoid = 'oid(test_table)'
    1637         self.assertIn(qoid, r)
    1638         oid = r[qoid]
    1639         self.assertIsInstance(oid, int)
    1640         s = delete('test_table', r)
    1641         self.assertEqual(s, 1)
    1642         s = delete('test_table', r)
    1643         self.assertEqual(s, 0)
    1644         r = get('test_table', 2, 'n')
    1645         self.assertIsInstance(r, dict)
    1646         self.assertEqual(r['n'], 2)
    1647         qoid = 'oid(test_table)'
    1648         self.assertIn(qoid, r)
    1649         oid = r[qoid]
    1650         self.assertIsInstance(oid, int)
    1651         r['oid'] = r.pop(qoid)
    1652         self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
    1653         s = delete('test_table', r, oid=oid)
    1654         self.assertEqual(s, 1)
    1655         s = delete('test_table', r)
    1656         self.assertEqual(s, 0)
    1657         s = delete('test_table', r, n=3)
    1658         self.assertEqual(s, 0)
    1659         q = 'select n from test_table order by 1 limit 3'
    1660         r = query(q).getresult()
    1661         self.assertEqual(r, [(3,)])
     1915        self.assertIn(qoid, s)
     1916        r = delete('test_table', s)
     1917        self.assertEqual(r, 1)
     1918        r = delete('test_table', s)
     1919        self.assertEqual(r, 0)
     1920        q = "select min(n),count(n) from test_table"
     1921        self.assertEqual(query(q).getresult()[0], (2, 5))
     1922        oid = get('test_table', 2, 'n')[qoid]
     1923        s = dict(oid=oid, n=2)
     1924        self.assertRaises(pg.ProgrammingError, delete, 'test_table', s)
     1925        r = delete('test_table', None, oid=oid)
     1926        self.assertEqual(r, 1)
     1927        r = delete('test_table', None, oid=oid)
     1928        self.assertEqual(r, 0)
     1929        self.assertEqual(query(q).getresult()[0], (3, 4))
     1930        s = dict(oid=oid, n=2)
     1931        oid = get('test_table', 3, 'n')[qoid]
     1932        self.assertRaises(pg.ProgrammingError, delete, 'test_table', s)
     1933        r = delete('test_table', s, oid=oid)
     1934        self.assertEqual(r, 1)
     1935        r = delete('test_table', s, oid=oid)
     1936        self.assertEqual(r, 0)
     1937        self.assertEqual(query(q).getresult()[0], (4, 3))
     1938        s = get('test_table', 4, 'n')
     1939        r = delete('test_table *', s)
     1940        self.assertEqual(r, 1)
     1941        r = delete('test_table *', s)
     1942        self.assertEqual(r, 0)
     1943        self.assertEqual(query(q).getresult()[0], (5, 2))
     1944        oid = get('test_table', 5, 'n')[qoid]
     1945        s = {qoid: oid, 'm': 4}
     1946        r = delete('test_table', s, m=6)
     1947        self.assertEqual(r, 1)
     1948        r = delete('test_table *', s)
     1949        self.assertEqual(r, 0)
     1950        self.assertEqual(query(q).getresult()[0], (6, 1))
     1951        query("alter table test_table add column m int")
     1952        query("alter table test_table add primary key (n)")
     1953        self.assertIn('m', self.db.get_attnames('test_table', flush=True))
     1954        self.assertEqual('n', self.db.pkey('test_table', flush=True))
     1955        for i in range(5):
     1956            query("insert into test_table values (%d, %d)" % (i + 1, i + 2))
     1957        s = dict(m=2)
     1958        self.assertRaises(KeyError, delete, 'test_table', s)
     1959        s = dict(m=2, oid=oid)
     1960        self.assertRaises(KeyError, delete, 'test_table', s)
     1961        r = delete('test_table', dict(m=2), oid=oid)
     1962        self.assertEqual(r, 0)
     1963        oid = get('test_table', 1, 'n')[qoid]
     1964        s = dict(oid=oid)
     1965        self.assertRaises(KeyError, delete, 'test_table', s)
     1966        r = delete('test_table', s, oid=oid)
     1967        self.assertEqual(r, 1)
     1968        r = delete('test_table', s, oid=oid)
     1969        self.assertEqual(r, 0)
     1970        self.assertEqual(query(q).getresult()[0], (2, 5))
     1971        s = get('test_table', 2, 'n')
     1972        del s['n']
     1973        r = delete('test_table', s)
     1974        self.assertEqual(r, 1)
     1975        r = delete('test_table', s)
     1976        self.assertEqual(r, 0)
     1977        self.assertEqual(query(q).getresult()[0], (3, 4))
     1978        r = delete('test_table', n=3)
     1979        self.assertEqual(r, 1)
     1980        r = delete('test_table', n=3)
     1981        self.assertEqual(r, 0)
     1982        self.assertEqual(query(q).getresult()[0], (4, 3))
     1983        r = delete('test_table', None, n=4)
     1984        self.assertEqual(r, 1)
     1985        r = delete('test_table', None, n=4)
     1986        self.assertEqual(r, 0)
     1987        self.assertEqual(query(q).getresult()[0], (5, 2))
     1988        s = dict(n=6)
     1989        r = delete('test_table', s, n=5)
     1990        self.assertEqual(r, 1)
     1991        r = delete('test_table', s, n=5)
     1992        self.assertEqual(r, 0)
     1993        self.assertEqual(query(q).getresult()[0], (6, 1))
    16621994
    16631995    def testDeleteWithCompositeKey(self):
     
    16712003            query("insert into %s values("
    16722004                "%d, '%s')" % (table, n + 1, t))
    1673         self.assertRaises(pg.ProgrammingError, self.db.delete,
    1674             table, dict(t='b'))
     2005        self.assertRaises(KeyError, self.db.delete, table, dict(t='b'))
    16752006        self.assertEqual(self.db.delete(table, dict(n=2)), 1)
    1676         r = query('select t from "%s" where n=2' % table
    1677                   ).getresult()
     2007        r = query('select t from "%s" where n=2' % table).getresult()
    16782008        self.assertEqual(r, [])
    16792009        self.assertEqual(self.db.delete(table, dict(n=2)), 0)
    1680         r = query('select t from "%s" where n=3' % table
    1681                   ).getresult()[0][0]
     2010        r = query('select t from "%s" where n=3' % table).getresult()[0][0]
    16822011        self.assertEqual(r, 'c')
    16832012        table = 'delete_test_table_2'
     
    16912020                query('insert into "%s" values('
    16922021                    "%d, %d, '%s')" % (table, n + 1, m + 1, t))
    1693         self.assertRaises(pg.ProgrammingError, self.db.delete,
    1694             table, dict(n=2, t='b'))
     2022        self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
    16952023        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
    16962024        r = [r[0] for r in query('select t from "%s" where n=2'
     
    17272055        r = query('select count(*) from "%s"' % table).getresult()
    17282056        self.assertEqual(r[0][0], 0)
     2057
     2058    def testDeleteReferenced(self):
     2059        delete = self.db.delete
     2060        query = self.db.query
     2061        query("drop table if exists test_child")
     2062        query("drop table if exists test_parent")
     2063        self.addCleanup(query, "drop table test_parent")
     2064        query("create table test_parent (n smallint primary key)")
     2065        self.addCleanup(query, "drop table test_child")
     2066        query("create table test_child ("
     2067            " n smallint primary key references test_parent (n))")
     2068        for n in range(3):
     2069            query("insert into test_parent (n) values (%d)" % n)
     2070            query("insert into test_child (n) values (%d)" % n)
     2071        q = ("select (select count(*) from test_parent),"
     2072            " (select count(*) from test_child)")
     2073        self.assertEqual(query(q).getresult()[0], (3, 3))
     2074        self.assertRaises(pg.ProgrammingError,
     2075            delete, 'test_parent', None, n=2)
     2076        self.assertRaises(pg.ProgrammingError,
     2077            delete, 'test_parent *', None, n=2)
     2078        r = delete('test_child', None, n=2)
     2079        self.assertEqual(r, 1)
     2080        self.assertEqual(query(q).getresult()[0], (3, 2))
     2081        r = delete('test_parent', None, n=2)
     2082        self.assertEqual(r, 1)
     2083        self.assertEqual(query(q).getresult()[0], (2, 2))
     2084        self.assertRaises(pg.ProgrammingError,
     2085            delete, 'test_parent', dict(n=0))
     2086        self.assertRaises(pg.ProgrammingError,
     2087            delete, 'test_parent *', dict(n=0))
     2088        r = delete('test_child', dict(n=0))
     2089        self.assertEqual(r, 1)
     2090        self.assertEqual(query(q).getresult()[0], (2, 1))
     2091        r = delete('test_child', dict(n=0))
     2092        self.assertEqual(r, 0)
     2093        r = delete('test_parent', dict(n=0))
     2094        self.assertEqual(r, 1)
     2095        self.assertEqual(query(q).getresult()[0], (1, 1))
     2096        r = delete('test_parent', None, n=0)
     2097        self.assertEqual(r, 0)
     2098        q = "select n from test_parent natural join test_child limit 2"
     2099        self.assertEqual(query(q).getresult(), [(1,)])
    17292100
    17302101    def testTruncate(self):
Note: See TracChangeset for help on using the changeset viewer.