Changeset 735 for trunk/tests


Ignore:
Timestamp:
Jan 13, 2016, 4:49:35 PM (4 years ago)
Author:
cito
Message:

Implement "upsert" method for PostgreSQL 9.5

A new method upsert() has been added to the DB wrapper class that
nicely complements the existing get/insert/update/delete() methods.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_dbwrapper.py

    r732 r735  
    133133            'unescape_bytea',
    134134            'update',
     135            'upsert',
    135136            'use_regtypes',
    136137            'user',
     
    868869        s = update(table, r)
    869870        self.assertEqual(s, r)
    870         r = query('select t from "%s" where n=2' % table
    871                   ).getresult()[0][0]
     871        q = 'select t from "%s" where n=2' % table
     872        r = query(q).getresult()[0][0]
    872873        self.assertEqual(r, 'u')
    873874        query('drop table "%s"' % table)
     
    885886        self.assertRaises(pg.ProgrammingError, update,
    886887                          table, dict(t='b'))
    887         self.assertEqual(update(table, dict(n=2, t='d'))['t'], 'd')
    888         r = query('select t from "%s" where n=2' % table
    889                   ).getresult()[0][0]
     888        s = dict(n=2, t='d')
     889        r = update(table, s)
     890        self.assertIs(r, s)
     891        self.assertEqual(r['n'], 2)
     892        self.assertEqual(r['t'], 'd')
     893        q = 'select t from "%s" where n=2' % table
     894        r = query(q).getresult()[0][0]
    890895        self.assertEqual(r, 'd')
     896        s.update(dict(n=4, t='e'))
     897        r = update(table, s)
     898        self.assertEqual(r['n'], 4)
     899        self.assertEqual(r['t'], 'e')
     900        q = 'select t from "%s" where n=2' % table
     901        r = query(q).getresult()[0][0]
     902        self.assertEqual(r, 'd')
     903        q = 'select t from "%s" where n=4' % table
     904        r = query(q).getresult()
     905        self.assertEqual(len(r), 0)
    891906        query('drop table "%s"' % table)
    892907        table = 'update_test_table_2'
     
    903918        self.assertEqual(update(table,
    904919                                dict(n=2, m=2, t='x'))['t'], 'x')
    905         r = [r[0] for r in query('select t from "%s" where n=2'
    906             ' order by m' % table).getresult()]
     920        q = 'select t from "%s" where n=2 order by m' % table
     921        r = [r[0] for r in query(q).getresult()]
    907922        self.assertEqual(r, ['c', 'x'])
    908923        query('drop table "%s"' % table)
     
    931946        self.assertEqual(r['Questions?'], 'When?')
    932947        query('drop table "%s"' % table)
     948
     949    def testUpsert(self):
     950        upsert = self.db.upsert
     951        query = self.db.query
     952        table = 'upsert_test_table'
     953        query('drop table if exists "%s"' % table)
     954        query('create table "%s" ('
     955            "n integer primary key, t text) with oids" % table)
     956        s = dict(n=1, t='x')
     957        try:
     958            r = upsert(table, s)
     959        except pg.ProgrammingError as error:
     960            if self.db.server_version < 90500:
     961                self.skipTest('database does not support upsert')
     962            self.fail(str(error))
     963        self.assertIs(r, s)
     964        self.assertEqual(r['n'], 1)
     965        self.assertEqual(r['t'], 'x')
     966        s.update(n=2, t='y')
     967        r = upsert(table, s, **dict.fromkeys(s))
     968        self.assertIs(r, s)
     969        self.assertEqual(r['n'], 2)
     970        self.assertEqual(r['t'], 'y')
     971        q = 'select n, t from "%s" order by n limit 3' % table
     972        r = query(q).getresult()
     973        self.assertEqual(r, [(1, 'x'), (2, 'y')])
     974        s.update(t='z')
     975        r = upsert(table, s)
     976        self.assertIs(r, s)
     977        self.assertEqual(r['n'], 2)
     978        self.assertEqual(r['t'], 'z')
     979        r = query(q).getresult()
     980        self.assertEqual(r, [(1, 'x'), (2, 'z')])
     981        s.update(t='n')
     982        r = upsert(table, s, t=False)
     983        self.assertIs(r, s)
     984        self.assertEqual(r['n'], 2)
     985        self.assertEqual(r['t'], 'z')
     986        r = query(q).getresult()
     987        self.assertEqual(r, [(1, 'x'), (2, 'z')])
     988        s.update(t='y')
     989        r = upsert(table, s, t=True)
     990        self.assertIs(r, s)
     991        self.assertEqual(r['n'], 2)
     992        self.assertEqual(r['t'], 'y')
     993        r = query(q).getresult()
     994        self.assertEqual(r, [(1, 'x'), (2, 'y')])
     995        s.update(t='n')
     996        r = upsert(table, s, t="included.t || '2'")
     997        self.assertIs(r, s)
     998        self.assertEqual(r['n'], 2)
     999        self.assertEqual(r['t'], 'y2')
     1000        r = query(q).getresult()
     1001        self.assertEqual(r, [(1, 'x'), (2, 'y2')])
     1002        s.update(t='y')
     1003        r = upsert(table, s, t="excluded.t || '3'")
     1004        self.assertIs(r, s)
     1005        self.assertEqual(r['n'], 2)
     1006        self.assertEqual(r['t'], 'y3')
     1007        r = query(q).getresult()
     1008        self.assertEqual(r, [(1, 'x'), (2, 'y3')])
     1009        s.update(n=1, t='2')
     1010        r = upsert(table, s, t="included.t || excluded.t")
     1011        self.assertIs(r, s)
     1012        self.assertEqual(r['n'], 1)
     1013        self.assertEqual(r['t'], 'x2')
     1014        r = query(q).getresult()
     1015        self.assertEqual(r, [(1, 'x2'), (2, 'y3')])
     1016        query('drop table "%s"' % table)
     1017
     1018    def testUpsertWithCompositeKey(self):
     1019        upsert = self.db.upsert
     1020        query = self.db.query
     1021        table = 'upsert_test_table_2'
     1022        query('drop table if exists "%s"' % table)
     1023        query('create table "%s" ('
     1024            "n integer, m integer, t text, primary key (n, m))" % table)
     1025        s = dict(n=1, m=2, t='x')
     1026        try:
     1027            r = upsert(table, s)
     1028        except pg.ProgrammingError as error:
     1029            if self.db.server_version < 90500:
     1030                self.skipTest('database does not support upsert')
     1031            self.fail(str(error))
     1032        self.assertIs(r, s)
     1033        self.assertEqual(r['n'], 1)
     1034        self.assertEqual(r['m'], 2)
     1035        self.assertEqual(r['t'], 'x')
     1036        s.update(m=3, t='y')
     1037        r = upsert(table, s, **dict.fromkeys(s))
     1038        self.assertIs(r, s)
     1039        self.assertEqual(r['n'], 1)
     1040        self.assertEqual(r['m'], 3)
     1041        self.assertEqual(r['t'], 'y')
     1042        q = 'select n, m, t from "%s" order by n, m limit 3' % table
     1043        r = query(q).getresult()
     1044        self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'y')])
     1045        s.update(t='z')
     1046        r = upsert(table, s)
     1047        self.assertIs(r, s)
     1048        self.assertEqual(r['n'], 1)
     1049        self.assertEqual(r['m'], 3)
     1050        self.assertEqual(r['t'], 'z')
     1051        r = query(q).getresult()
     1052        self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'z')])
     1053        s.update(t='n')
     1054        r = upsert(table, s, t=False)
     1055        self.assertIs(r, s)
     1056        self.assertEqual(r['n'], 1)
     1057        self.assertEqual(r['m'], 3)
     1058        self.assertEqual(r['t'], 'z')
     1059        r = query(q).getresult()
     1060        self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'z')])
     1061        s.update(t='n')
     1062        r = upsert(table, s, t=True)
     1063        self.assertIs(r, s)
     1064        self.assertEqual(r['n'], 1)
     1065        self.assertEqual(r['m'], 3)
     1066        self.assertEqual(r['t'], 'n')
     1067        r = query(q).getresult()
     1068        self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'n')])
     1069        s.update(n=2, t='y')
     1070        r = upsert(table, s, t="'z'")
     1071        self.assertIs(r, s)
     1072        self.assertEqual(r['n'], 2)
     1073        self.assertEqual(r['m'], 3)
     1074        self.assertEqual(r['t'], 'y')
     1075        r = query(q).getresult()
     1076        self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'n'), (2, 3, 'y')])
     1077        s.update(n=1, t='m')
     1078        r = upsert(table, s, t='included.t || excluded.t')
     1079        self.assertIs(r, s)
     1080        self.assertEqual(r['n'], 1)
     1081        self.assertEqual(r['m'], 3)
     1082        self.assertEqual(r['t'], 'nm')
     1083        r = query(q).getresult()
     1084        self.assertEqual(r, [(1, 2, 'x'), (1, 3, 'nm'), (2, 3, 'y')])
     1085        query('drop table "%s"' % table)
     1086
     1087    def testUpsertWithQuotedNames(self):
     1088        upsert = self.db.upsert
     1089        query = self.db.query
     1090        table = 'test table for upsert()'
     1091        query('drop table if exists "%s"' % table)
     1092        query('create table "%s" ('
     1093            '"Prime!" smallint primary key,'
     1094            '"much space" integer, "Questions?" text)' % table)
     1095        s = {'Prime!': 31, 'much space': 9009, 'Questions?': 'Yes.'}
     1096        try:
     1097            r = upsert(table, s)
     1098        except pg.ProgrammingError as error:
     1099            if self.db.server_version < 90500:
     1100                self.skipTest('database does not support upsert')
     1101            self.fail(str(error))
     1102        self.assertIs(r, s)
     1103        self.assertEqual(r['Prime!'], 31)
     1104        self.assertEqual(r['much space'], 9009)
     1105        self.assertEqual(r['Questions?'], 'Yes.')
     1106        q = 'select * from "%s" limit 2' % table
     1107        r = query(q).getresult()
     1108        self.assertEqual(r, [(31, 9009, 'Yes.')])
     1109        s.update({'Questions?': 'No.'})
     1110        r = upsert(table, s)
     1111        self.assertIs(r, s)
     1112        self.assertEqual(r['Prime!'], 31)
     1113        self.assertEqual(r['much space'], 9009)
     1114        self.assertEqual(r['Questions?'], 'No.')
     1115        r = query(q).getresult()
     1116        self.assertEqual(r, [(31, 9009, 'No.')])
    9331117
    9341118    def testClear(self):
Note: See TracChangeset for help on using the changeset viewer.