Changeset 763 for trunk


Ignore:
Timestamp:
Jan 17, 2016, 4:01:04 PM (4 years ago)
Author:
cito
Message:

Achieve 100% test coverage for pg module on the trunk

Note that some lines are only covered in certain Pg or Py versions,
so you need to run tests with different versions to be sure.

Also added another synonym for transaction methods,
you can now pick your favorite for all three of them.

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/pg/db_wrapper.rst

    r762 r763  
    258258    made by the transaction to be discarded.
    259259
    260 .. versionadded:: 4.1
     260.. method:: DB.abort()
     261
     262    This is the same as the :meth:`DB.rollback` method.
     263
     264.. versionadded:: 4.2
    261265
    262266.. method:: DB.savepoint(name)
  • trunk/pg.py

    r762 r763  
    374374        """Prepare and add a parameter to the list."""
    375375        if value is not None and typ != 'text':
     376            prepare = self._prepare_funcs[typ]
    376377            try:
    377                 prepare = self._prepare_funcs[typ]
    378             except KeyError:
    379                 pass
    380             else:
    381                 try:
    382                     value = prepare(self, value)
    383                 except ValueError:
    384                     return value
     378                value = prepare(self, value)
     379            except ValueError:
     380                return value
    385381        params.append(value)
    386382        return '$%d' % len(params)
     
    469465            qstr += ' TO ' + name
    470466        return self.query(qstr)
     467
     468    abort = rollback
    471469
    472470    def savepoint(self, name):
     
    724722                    self._prepare_qualified_param(table, 1))
    725723            names = self.db.query(q, (table,)).getresult()
    726             if not names:
    727                 raise KeyError('Table %s does not exist' % table)
    728724            if not self._regtypes:
    729725                names = ((name, _simpletype(typ)) for name, typ in names)
     
    788784            if isinstance(row, dict):
    789785                if qoid not in row:
    790                     raise _db_error('%s not in row' % qoid)
     786                    raise _prg_error('%s not in row' % qoid)
    791787            else:
    792788                row = {qoid: row}
     
    855851        self._do_debug(q, params)
    856852        q = self.db.query(q, params)
    857         res = q.dictresult()
    858         if not res:
    859             raise _int_error('Insert operation did not return new values')
     853        res = q.dictresult()  # this will always return a row
    860854        for n, value in res[0].items():
    861855            if n == 'oid':
     
    880874        qoid = _oid_key(table)
    881875        if 'oid' in kw:
    882             kw[qoid] = kw['oid']
    883             del kw['oid']
     876            kw[qoid] = kw.pop('oid')
    884877        if row is None:
    885878            row = {}
     
    991984        keyname = [keyname] if isinstance(
    992985            keyname, basestring) else sorted(keyname)
    993         try:
    994             target = ', '.join(col(k) for k in keyname)
    995         except KeyError:
    996             raise _prg_error('Upsert operation needs primary key or oid')
     986        target = ', '.join(col(k) for k in keyname)
    997987        update = []
    998988        keyname = set(keyname)
     
    1005995                        value = 'excluded.%s' % col(n)
    1006996                    update.append('%s = %s' % (col(n), value))
    1007         if not values and not update:
     997        if not values:
    1008998            return row
    1009999        do = 'update set %s' % ', '.join(update) if update else 'nothing'
     
    10221012            raise  # re-raise original error
    10231013        res = q.dictresult()
    1024         if res:  # may be empty with "do nothing"
     1014        if update:  # may be empty with "do nothing"
    10251015            for n, value in res[0].items():
    10261016                if n == 'oid':
    10271017                    n = _oid_key(table)
    1028                 elif attnames.get(n) == 'bytea':
     1018                elif attnames.get(n) == 'bytea' and value is not None:
    10291019                    value = self.unescape_bytea(value)
    10301020                row[n] = value
    1031         elif update:
    1032             raise _int_error('Upsert operation did not return new values')
    10331021        else:
    10341022            self.get(table, row)
     
    10741062        qoid = _oid_key(table)
    10751063        if 'oid' in kw:
    1076             kw[qoid] = kw['oid']
    1077             del kw['oid']
     1064            kw[qoid] = kw.pop('oid')
    10781065        if row is None:
    10791066            row = {}
  • trunk/tests/test_classic_dbwrapper.py

    r762 r763  
    9393    def testAllDBAttributes(self):
    9494        attributes = [
     95            'abort',
    9596            'begin',
    9697            'cancel', 'clear', 'close', 'commit',
     
    230231        else:
    231232            self.fail('Reset should give an error for a closed connection')
     233        self.assertIsNone(self.db.db)
    232234        self.assertRaises(pg.InternalError, self.db.close)
    233235        self.assertRaises(pg.InternalError, self.db.query, 'select 1')
     236        self.assertRaises(pg.InternalError, getattr, self.db, 'status')
     237        self.assertRaises(pg.InternalError, getattr, self.db, 'error')
     238        self.assertRaises(pg.InternalError, getattr, self.db, 'absent')
    234239
    235240    def testMethodReset(self):
     
    416421        self.assertRaises(TypeError, f, None)
    417422        self.assertRaises(TypeError, f, 42)
     423        self.assertRaises(TypeError, f, '')
     424        self.assertRaises(TypeError, f, [])
     425        self.assertRaises(TypeError, f, [''])
    418426        self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
    419427        r = f('standard_conforming_strings')
     
    467475        self.assertRaises(TypeError, f, None)
    468476        self.assertRaises(TypeError, f, 42)
     477        self.assertRaises(TypeError, f, '')
     478        self.assertRaises(TypeError, f, [])
     479        self.assertRaises(TypeError, f, [''])
     480        self.assertRaises(ValueError, f, 'all', 'invalid')
     481        self.assertRaises(ValueError, f, {
     482            'invalid1': 'value1', 'invalid2': 'value2'}, 'value')
    469483        self.assertRaises(pg.ProgrammingError, f, 'this_does_not_exist')
    470484        f('standard_conforming_strings', 'off')
     
    810824        self.assertRaises(pg.ProgrammingError,
    811825            self.db.get_attnames, 'has.too.many.dots')
     826        r = get_attnames('test')
     827        self.assertIsInstance(r, dict)
     828        self.assertEqual(r, dict(
     829            i2='int', i4='int', i8='int', d='num',
     830            f4='float', f8='float', m='money',
     831            v4='text', c4='text', t='text'))
    812832        query = self.db.query
    813833        query("drop table if exists test_table")
     
    861881            " n int, alpha smallint, beta bool,"
    862882            " gamma char(5), tau text, v varchar(3))")
    863         self.db.use_regtypes(True)
     883        use_regtypes = self.db.use_regtypes
     884        regtypes = use_regtypes()
     885        self.assertFalse(regtypes)
     886        use_regtypes(True)
    864887        try:
    865888            r = get_attnames("test_table")
    866889            self.assertIsInstance(r, dict)
    867890        finally:
    868             self.db.use_regtypes(False)
     891            use_regtypes(regtypes)
    869892        self.assertEqual(r, dict(
    870893            n='integer', alpha='smallint', beta='boolean',
     
    875898        query = self.db.query
    876899        query("drop table if exists test_table")
    877         self.addCleanup(query, "drop table if exists test_table")
     900        self.addCleanup(query, "drop table test_table")
    878901        query("create table test_table(col int)")
    879902        r = get_attnames("test_table")
    880903        self.assertIsInstance(r, dict)
    881904        self.assertEqual(r, dict(col='int'))
    882         query("drop table test_table")
    883         query("create table test_table(col text)")
     905        query("alter table test_table alter column col type text")
     906        query("alter table test_table add column col2 int")
    884907        r = get_attnames("test_table")
    885908        self.assertEqual(r, dict(col='int'))
    886909        r = get_attnames("test_table", flush=True)
     910        self.assertEqual(r, dict(col='text', col2='int'))
     911        query("alter table test_table drop column col2")
     912        r = get_attnames("test_table")
     913        self.assertEqual(r, dict(col='text', col2='int'))
     914        r = get_attnames("test_table", flush=True)
    887915        self.assertEqual(r, dict(col='text'))
    888         query("drop table test_table")
     916        query("alter table test_table drop column col")
    889917        r = get_attnames("test_table")
    890918        self.assertEqual(r, dict(col='text'))
    891         self.assertRaises(pg.ProgrammingError,
    892             get_attnames, "test_table", flush=True)
     919        r = get_attnames("test_table", flush=True)
     920        self.assertEqual(r, dict())
    893921
    894922    def testGetAttnamesIsOrdered(self):
     
    936964                % (table, n + 1, t))
    937965        self.assertRaises(pg.ProgrammingError, get, table, 2)
     966        self.assertRaises(pg.ProgrammingError, get, table, {}, 'oid')
    938967        r = get(table, 2, 'n')
    939968        oid_table = 'oid(%s)' % table
     
    11611190            query('delete from "%s"' % table)
    11621191
     1192    def testInsertWithOid(self):
     1193        insert = self.db.insert
     1194        query = self.db.query
     1195        query("drop table if exists test_table")
     1196        self.addCleanup(query, "drop table test_table")
     1197        query("create table test_table (n int) with oids")
     1198        r = insert('test_table', n=1)
     1199        self.assertIsInstance(r, dict)
     1200        self.assertEqual(r['n'], 1)
     1201        qoid = 'oid(test_table)'
     1202        self.assertIn(qoid, r)
     1203        r = insert('test_table', n=2, oid='invalid')
     1204        self.assertIsInstance(r, dict)
     1205        self.assertEqual(r['n'], 2)
     1206        r['n'] = 3
     1207        r = insert('test_table', r)
     1208        self.assertIsInstance(r, dict)
     1209        self.assertEqual(r['n'], 3)
     1210        r = insert('test_table', r, n=4)
     1211        self.assertIsInstance(r, dict)
     1212        self.assertEqual(r['n'], 4)
     1213        q = 'select n from test_table order by 1 limit 5'
     1214        r = query(q).getresult()
     1215        self.assertEqual(r, [(1,), (2,), (3,), (4,)])
     1216
    11631217    def testInsertWithQuotedNames(self):
    11641218        insert = self.db.insert
     
    11861240        update = self.db.update
    11871241        query = self.db.query
     1242        self.assertRaises(pg.ProgrammingError, update,
     1243            'test', i2=2, i4=4, i8=8)
    11881244        table = 'update_test_table'
    11891245        query('drop table if exists "%s"' % table)
     
    12021258        r = query(q).getresult()[0][0]
    12031259        self.assertEqual(r, 'u')
     1260
     1261    def testUpdateWithOid(self):
     1262        update = self.db.update
     1263        get = self.db.get
     1264        query = self.db.query
     1265        query("drop table if exists test_table")
     1266        self.addCleanup(query, "drop table test_table")
     1267        query("create table test_table (n int) with oids")
     1268        query("insert into test_table values (1)")
     1269        r = get('test_table', 1, 'n')
     1270        self.assertIsInstance(r, dict)
     1271        self.assertEqual(r['n'], 1)
     1272        r['n'] = 2
     1273        r = update('test_table', r)
     1274        self.assertIsInstance(r, dict)
     1275        self.assertEqual(r['n'], 2)
     1276        qoid = 'oid(test_table)'
     1277        self.assertIn(qoid, r)
     1278        r['n'] = 3
     1279        r = update('test_table', r, oid=r.pop(qoid))
     1280        self.assertIsInstance(r, dict)
     1281        self.assertEqual(r['n'], 3)
     1282        r.pop(qoid)
     1283        self.assertRaises(pg.ProgrammingError, update, 'test_table', r)
     1284        r = get('test_table', 3, 'n')
     1285        self.assertIsInstance(r, dict)
     1286        self.assertEqual(r['n'], 3)
     1287        r.pop('n')
     1288        r = update('test_table', r)
     1289        r.pop(qoid)
     1290        self.assertEqual(r, {})
     1291        q = 'select n from test_table limit 2'
     1292        r = query(q).getresult()
     1293        self.assertEqual(r, [(3,)])
    12041294
    12051295    def testUpdateWithCompositeKey(self):
     
    12791369        upsert = self.db.upsert
    12801370        query = self.db.query
     1371        self.assertRaises(pg.ProgrammingError, upsert,
     1372            'test', i2=2, i4=4, i8=8)
    12811373        table = 'upsert_test_table'
    12821374        query('drop table if exists "%s"' % table)
     
    13441436        r = query(q).getresult()
    13451437        self.assertEqual(r, [(1, 'x2'), (2, 'y3')])
     1438        # not existing columns and oid parameter should be ignored
     1439        s = dict(m=3, u='z')
     1440        r = upsert(table, s, oid='invalid')
     1441        self.assertIs(r, s)
    13461442
    13471443    def testUpsertWithCompositeKey(self):
     
    14501546        query = self.db.query
    14511547        f = False if pg.get_bool() else 'f'
     1548        r = clear('test')
     1549        result = dict(
     1550            i2=0, i4=0, i8=0, d=0, f4=0, f8=0, m=0, v4='', c4='', t='')
     1551        self.assertEqual(r, result)
    14521552        table = 'clear_test_table'
    14531553        query('drop table if exists "%s"' % table)
    14541554        self.addCleanup(query, 'drop table "%s"' % table)
    14551555        query('create table "%s" ('
    1456             "n integer, b boolean, d date, t text)" % table)
     1556            "n integer, b boolean, d date, t text) with oids" % table)
    14571557        r = clear(table)
    1458         result = {'n': 0, 'b': f, 'd': '', 't': ''}
     1558        result = dict(n=0, b=f, d='', t='')
    14591559        self.assertEqual(r, result)
    14601560        r['a'] = r['n'] = 1
     
    14631563        r['oid'] = long(1)
    14641564        r = clear(table, r)
    1465         result = {'a': 1, 'n': 0, 'b': f, 'd': '', 't': '',
    1466             'oid': long(1)}
     1565        result = dict(a=1, n=0, b=f, d='', t='', oid=long(1))
    14671566        self.assertEqual(r, result)
    14681567
     
    14851584        delete = self.db.delete
    14861585        query = self.db.query
     1586        self.assertRaises(pg.ProgrammingError, delete,
     1587            'test', dict(i2=2, i4=4, i8=8))
    14871588        table = 'delete_test_table'
    14881589        query('drop table if exists "%s"' % table)
     
    15131614        self.assertEqual(s, 0)
    15141615        self.assertRaises(pg.DatabaseError, self.db.get, table, 2, 'n')
     1616        # not existing columns and oid parameter should be ignored
     1617        r.update(m=3, u='z', oid='invalid')
     1618        s = delete(table, r)
     1619        self.assertEqual(s, 0)
     1620
     1621    def testDeleteWithOid(self):
     1622        delete = self.db.delete
     1623        get = self.db.get
     1624        query = self.db.query
     1625        query("drop table if exists test_table")
     1626        self.addCleanup(query, "drop table test_table")
     1627        query("create table test_table (n int) with oids")
     1628        query("insert into test_table values (1)")
     1629        query("insert into test_table values (2)")
     1630        query("insert into test_table values (3)")
     1631        r = dict(n=3)
     1632        self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
     1633        r = get('test_table', 1, 'n')
     1634        self.assertIsInstance(r, dict)
     1635        self.assertEqual(r['n'], 1)
     1636        qoid = 'oid(test_table)'
     1637        self.assertIn(qoid, r)
     1638        oid = r[qoid]
     1639        self.assertIsInstance(oid, int)
     1640        s = delete('test_table', r)
     1641        self.assertEqual(s, 1)
     1642        s = delete('test_table', r)
     1643        self.assertEqual(s, 0)
     1644        r = get('test_table', 2, 'n')
     1645        self.assertIsInstance(r, dict)
     1646        self.assertEqual(r['n'], 2)
     1647        qoid = 'oid(test_table)'
     1648        self.assertIn(qoid, r)
     1649        oid = r[qoid]
     1650        self.assertIsInstance(oid, int)
     1651        r['oid'] = r.pop(qoid)
     1652        self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
     1653        s = delete('test_table', r, oid=oid)
     1654        self.assertEqual(s, 1)
     1655        s = delete('test_table', r)
     1656        self.assertEqual(s, 0)
     1657        s = delete('test_table', r, n=3)
     1658        self.assertEqual(s, 0)
     1659        q = 'select n from test_table order by 1 limit 3'
     1660        r = query(q).getresult()
     1661        self.assertEqual(r, [(3,)])
    15151662
    15161663    def testDeleteWithCompositeKey(self):
     
    18151962            "select * from test_table order by 1").getresult()]
    18161963        self.assertEqual(r, [1, 2, 5, 7, 9])
     1964        self.db.begin(mode='read only')
     1965        self.assertRaises(pg.ProgrammingError,
     1966            query, "insert into test_table values (0)")
     1967        self.db.rollback()
     1968        self.db.start(mode='Read Only')
     1969        self.assertRaises(pg.ProgrammingError,
     1970            query, "insert into test_table values (0)")
     1971        self.db.abort()
     1972
     1973    def testTransactionAliases(self):
     1974        self.assertEqual(self.db.begin, self.db.start)
     1975        self.assertEqual(self.db.commit, self.db.end)
     1976        self.assertEqual(self.db.rollback, self.db.abort)
    18171977
    18181978    def testContextManager(self):
     
    19252085        self.assertIsInstance(r, bytes)
    19262086        self.assertEqual(r, s)
     2087
     2088    def testUpsertBytea(self):
     2089        query = self.db.query
     2090        query('drop table if exists bytea_test')
     2091        self.addCleanup(query, 'drop table bytea_test')
     2092        query('create table bytea_test (n smallint primary key, data bytea)')
     2093        s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
     2094        r = dict(n=7, data=s)
     2095        try:
     2096            r = self.db.upsert('bytea_test', r)
     2097        except pg.ProgrammingError as error:
     2098            if self.db.server_version < 90500:
     2099                self.skipTest('database does not support upsert')
     2100            self.fail(str(error))
     2101        self.assertIsInstance(r, dict)
     2102        self.assertIn('n', r)
     2103        self.assertEqual(r['n'], 7)
     2104        self.assertIn('data', r)
     2105        self.assertIsInstance(r['data'], bytes)
     2106        self.assertEqual(r['data'], s)
     2107        r['data'] = None
     2108        r = self.db.upsert('bytea_test', r)
     2109        self.assertIsInstance(r, dict)
     2110        self.assertIn('n', r)
     2111        self.assertEqual(r['n'], 7)
     2112        self.assertIn('data', r)
     2113        self.assertIsNone(r['data'], bytes)
    19272114
    19282115    def testNotificationHandler(self):
Note: See TracChangeset for help on using the changeset viewer.