Changeset 748 for branches/4.x


Ignore:
Timestamp:
Jan 15, 2016, 9:25:31 AM (4 years ago)
Author:
cito
Message:

Add method truncate() to DB wrapper class

This methods can be used to quickly truncate tables.

Since this is pretty useful and will not break anything, I have
also back ported this addition to the 4.x branch.

Everything is well documented and tested, of course.

Location:
branches/4.x
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • branches/4.x/docs/contents/changelog.rst

    r745 r748  
    1010- New methods get_parameters() and set_parameters() in the classic interface
    1111  which can be used to get or set run-time parameters.
     12- New method truncate() in the classic interface that can be used to quickly
     13  empty a table or a set of tables.
    1214- Fix decimal point handling.
    1315- Add option to return boolean values as bool objects.
  • branches/4.x/docs/contents/pg/db_wrapper.rst

    r747 r748  
    416416exist and 1 if the row was deleted).
    417417
     418truncate -- Quickly empty database tables
     419-----------------------------------------
     420
     421.. method:: DB.truncate(self, table, [restart], [cascade], [only]):
     422
     423    Empty a table or set of tables
     424
     425    :param table: the name of the table(s)
     426    :type table: str, list or set
     427    :param bool restart: whether table sequences should be restarted
     428    :param bool cascade: whether referenced tables should also be truncated
     429    :param only: whether only parent tables should be truncated
     430    :type only: bool or list
     431
     432This method quickly removes all rows from the given table or set
     433of tables.  It has the same effect as an unqualified DELETE on each
     434table, but since it does not actually scan the tables it is faster.
     435Furthermore, it reclaims disk space immediately, rather than requiring
     436a subsequent VACUUM operation. This is most useful on large tables.
     437
     438If *restart* is set to `True`, sequences owned by columns of the truncated
     439table(s) are automatically restarted.  If *cascade* is set to `True`, it
     440also truncates all tables that have foreign-key references to any of
     441the named tables.  If the parameter *only* is not set to `True`, all the
     442descendant tables (if any) will also be truncated. Optionally, a ``*``
     443can be specified after the table name to explicitly indicate that
     444descendant tables are included.  If the parameter *table* is a list,
     445the parameter *only* can also be a list of corresponding boolean values.
     446
    418447escape_literal -- escape a literal string for use within SQL
    419448------------------------------------------------------------
  • branches/4.x/pg.py

    r747 r748  
    623623        elif isinstance(parameter, dict):
    624624            if value is not None:
    625                 raise ValueError(
    626                     'A value must not be set when parameter is a dictionary')
     625                raise ValueError('A value must not be specified'
     626                    ' when parameter is a dictionary')
    627627        else:
    628628            raise TypeError(
     
    640640            if param == 'all':
    641641                if value is not None:
    642                     raise ValueError(
    643                         "A value must ot be set when parameter is 'all'")
     642                    raise ValueError('A value must ot be specified'
     643                        " when parameter is 'all'")
    644644                params = {'all': None}
    645645                break
     
    11001100        return int(self.db.query(q))
    11011101
     1102    def truncate(self, table, restart=False, cascade=False, only=False):
     1103        """Empty a table or set of tables.
     1104
     1105        This method quickly removes all rows from the given table or set
     1106        of tables.  It has the same effect as an unqualified DELETE on each
     1107        table, but since it does not actually scan the tables it is faster.
     1108        Furthermore, it reclaims disk space immediately, rather than requiring
     1109        a subsequent VACUUM operation. This is most useful on large tables.
     1110
     1111        If restart is set to True, sequences owned by columns of the truncated
     1112        table(s) are automatically restarted.  If cascade is set to True, it
     1113        also truncates all tables that have foreign-key references to any of
     1114        the named tables.  If the parameter only is not set to True, all the
     1115        descendant tables (if any) will also be truncated. Optionally, a '*'
     1116        can be specified after the table name to explicitly indicate that
     1117        descendant tables are included.
     1118        """
     1119        if isinstance(table, basestring):
     1120            only = {table: only}
     1121            table = [table]
     1122        elif isinstance(table, (list, tuple)):
     1123            if isinstance(only, (list, tuple)):
     1124                only = dict(zip(table, only))
     1125            else:
     1126                only = dict.fromkeys(table, only)
     1127        elif isinstance(table, (set, frozenset)):
     1128            only = dict.fromkeys(table, only)
     1129        else:
     1130            raise TypeError('The table must be a string, list or set')
     1131        if not (restart is None or isinstance(restart, (bool, int))):
     1132            raise TypeError('Invalid type for the restart option')
     1133        if not (cascade is None or isinstance(cascade, (bool, int))):
     1134            raise TypeError('Invalid type for the cascade option')
     1135        tables = []
     1136        for t in table:
     1137            u = only.get(t)
     1138            if not (u is None or isinstance(u, (bool, int))):
     1139                raise TypeError('Invalid type for the only option')
     1140            if t.endswith('*'):
     1141                if u:
     1142                    raise ValueError(
     1143                        'Contradictory table name and only options')
     1144                t = t[:-1].rstrip()
     1145            t = self._add_schema(t)
     1146            if u:
     1147                t = 'ONLY %s' % t
     1148            tables.append(t)
     1149        q = ['TRUNCATE', ', '.join(tables)]
     1150        if restart:
     1151            q.append('RESTART IDENTITY')
     1152        if cascade:
     1153            q.append('CASCADE')
     1154        q = ' '.join(q)
     1155        self._do_debug(q)
     1156        return self.query(q)
     1157
    11021158    def notification_handler(self, event, callback, arg_dict={}, timeout=None):
    11031159        """Get notification handler that will run the given callback."""
  • branches/4.x/tests/test_classic_dbwrapper.py

    r747 r748  
    9595            'set_notice_receiver', 'set_parameter',
    9696            'source', 'start', 'status',
    97             'transaction', 'tty',
     97            'transaction', 'truncate', 'tty',
    9898            'unescape_bytea', 'update',
    9999            'use_regtypes', 'user',
     
    11491149        query("drop table %s" % table)
    11501150
     1151    def testTruncate(self):
     1152        truncate = self.db.truncate
     1153        self.assertRaises(TypeError, truncate, None)
     1154        self.assertRaises(TypeError, truncate, 42)
     1155        self.assertRaises(TypeError, truncate, dict(test_table=None))
     1156        query = self.db.query
     1157        query("drop table if exists test_table")
     1158        query("create table test_table (n smallint)")
     1159        for i in range(3):
     1160            query("insert into test_table values (1)")
     1161        q = "select count(*) from test_table"
     1162        r = query(q).getresult()[0][0]
     1163        self.assertEqual(r, 3)
     1164        truncate('test_table')
     1165        r = query(q).getresult()[0][0]
     1166        self.assertEqual(r, 0)
     1167        for i in range(3):
     1168            query("insert into test_table values (1)")
     1169        r = query(q).getresult()[0][0]
     1170        self.assertEqual(r, 3)
     1171        truncate('public.test_table')
     1172        r = query(q).getresult()[0][0]
     1173        self.assertEqual(r, 0)
     1174        query("drop table if exists test_table_2")
     1175        query('create table test_table_2 (n smallint)')
     1176        for t in (list, tuple, set):
     1177            for i in range(3):
     1178                query("insert into test_table values (1)")
     1179                query("insert into test_table_2 values (2)")
     1180            q = ("select (select count(*) from test_table),"
     1181                " (select count(*) from test_table_2)")
     1182            r = query(q).getresult()[0]
     1183            self.assertEqual(r, (3, 3))
     1184            truncate(t(['test_table', 'test_table_2']))
     1185            r = query(q).getresult()[0]
     1186            self.assertEqual(r, (0, 0))
     1187        query("drop table test_table_2")
     1188        query("drop table test_table")
     1189
     1190    def testTruncateRestart(self):
     1191        truncate = self.db.truncate
     1192        self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
     1193        query = self.db.query
     1194        query("drop table if exists test_table")
     1195        query("create table test_table (n serial, t text)")
     1196        for n in range(3):
     1197            query("insert into test_table (t) values ('test')")
     1198        q = "select count(n), min(n), max(n) from test_table"
     1199        r = query(q).getresult()[0]
     1200        self.assertEqual(r, (3, 1, 3))
     1201        truncate('test_table')
     1202        r = query(q).getresult()[0]
     1203        self.assertEqual(r, (0, None, None))
     1204        for n in range(3):
     1205            query("insert into test_table (t) values ('test')")
     1206        r = query(q).getresult()[0]
     1207        self.assertEqual(r, (3, 4, 6))
     1208        truncate('test_table', restart=True)
     1209        r = query(q).getresult()[0]
     1210        self.assertEqual(r, (0, None, None))
     1211        for n in range(3):
     1212            query("insert into test_table (t) values ('test')")
     1213        r = query(q).getresult()[0]
     1214        self.assertEqual(r, (3, 1, 3))
     1215        query("drop table test_table")
     1216
     1217    def testTruncateCascade(self):
     1218        truncate = self.db.truncate
     1219        self.assertRaises(TypeError, truncate, 'test_table', cascade='invalid')
     1220        query = self.db.query
     1221        query("drop table if exists test_child")
     1222        query("drop table if exists test_parent")
     1223        query("create table test_parent (n smallint primary key)")
     1224        query("create table test_child ("
     1225            " n smallint primary key references test_parent (n))")
     1226        for n in range(3):
     1227            query("insert into test_parent (n) values (%d)" % n)
     1228            query("insert into test_child (n) values (%d)" % n)
     1229        q = ("select (select count(*) from test_parent),"
     1230            " (select count(*) from test_child)")
     1231        r = query(q).getresult()[0]
     1232        self.assertEqual(r, (3, 3))
     1233        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1234        truncate(['test_parent', 'test_child'])
     1235        r = query(q).getresult()[0]
     1236        self.assertEqual(r, (0, 0))
     1237        for n in range(3):
     1238            query("insert into test_parent (n) values (%d)" % n)
     1239            query("insert into test_child (n) values (%d)" % n)
     1240        r = query(q).getresult()[0]
     1241        self.assertEqual(r, (3, 3))
     1242        truncate('test_parent', cascade=True)
     1243        r = query(q).getresult()[0]
     1244        self.assertEqual(r, (0, 0))
     1245        for n in range(3):
     1246            query("insert into test_parent (n) values (%d)" % n)
     1247            query("insert into test_child (n) values (%d)" % n)
     1248        r = query(q).getresult()[0]
     1249        self.assertEqual(r, (3, 3))
     1250        truncate('test_child')
     1251        r = query(q).getresult()[0]
     1252        self.assertEqual(r, (3, 0))
     1253        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1254        truncate('test_parent', cascade=True)
     1255        r = query(q).getresult()[0]
     1256        self.assertEqual(r, (0, 0))
     1257        query("drop table test_child")
     1258        query("drop table test_parent")
     1259
     1260    def testTruncateOnly(self):
     1261        truncate = self.db.truncate
     1262        self.assertRaises(TypeError, truncate, 'test_table', only='invalid')
     1263        query = self.db.query
     1264        query("drop table if exists test_child")
     1265        query("drop table if exists test_parent")
     1266        query("create table test_parent (n smallint)")
     1267        query("create table test_child ("
     1268            " m smallint) inherits (test_parent)")
     1269        for n in range(3):
     1270            query("insert into test_parent (n) values (1)")
     1271            query("insert into test_child (n, m) values (2, 3)")
     1272        q = ("select (select count(*) from test_parent),"
     1273            " (select count(*) from test_child)")
     1274        r = query(q).getresult()[0]
     1275        self.assertEqual(r, (6, 3))
     1276        truncate('test_parent')
     1277        r = query(q).getresult()[0]
     1278        self.assertEqual(r, (0, 0))
     1279        for n in range(3):
     1280            query("insert into test_parent (n) values (1)")
     1281            query("insert into test_child (n, m) values (2, 3)")
     1282        r = query(q).getresult()[0]
     1283        self.assertEqual(r, (6, 3))
     1284        truncate('test_parent*')
     1285        r = query(q).getresult()[0]
     1286        self.assertEqual(r, (0, 0))
     1287        for n in range(3):
     1288            query("insert into test_parent (n) values (1)")
     1289            query("insert into test_child (n, m) values (2, 3)")
     1290        r = query(q).getresult()[0]
     1291        self.assertEqual(r, (6, 3))
     1292        truncate('test_parent', only=True)
     1293        r = query(q).getresult()[0]
     1294        self.assertEqual(r, (3, 3))
     1295        truncate('test_parent', only=False)
     1296        r = query(q).getresult()[0]
     1297        self.assertEqual(r, (0, 0))
     1298        self.assertRaises(ValueError, truncate, 'test_parent*', only=True)
     1299        truncate('test_parent*', only=False)
     1300        query("drop table if exists test_parent_2")
     1301        query("create table test_parent_2 (n smallint)")
     1302        query("drop table if exists test_child_2")
     1303        query("create table test_child_2 ("
     1304            " m smallint) inherits (test_parent_2)")
     1305        for n in range(3):
     1306            query("insert into test_parent (n) values (1)")
     1307            query("insert into test_child (n, m) values (2, 3)")
     1308            query("insert into test_parent_2 (n) values (1)")
     1309            query("insert into test_child_2 (n, m) values (2, 3)")
     1310        q = ("select (select count(*) from test_parent),"
     1311            " (select count(*) from test_child),"
     1312            " (select count(*) from test_parent_2),"
     1313            " (select count(*) from test_child_2)")
     1314        r = query(q).getresult()[0]
     1315        self.assertEqual(r, (6, 3, 6, 3))
     1316        truncate(['test_parent', 'test_parent_2'], only=[False, True])
     1317        r = query(q).getresult()[0]
     1318        self.assertEqual(r, (0, 0, 3, 3))
     1319        truncate(['test_parent', 'test_parent_2'], only=False)
     1320        r = query(q).getresult()[0]
     1321        self.assertEqual(r, (0, 0, 0, 0))
     1322        self.assertRaises(ValueError, truncate,
     1323            ['test_parent*', 'test_child'], only=[True, False])
     1324        truncate(['test_parent*', 'test_child'], only=[False, True])
     1325        query("drop table test_child_2")
     1326        query("drop table test_parent_2")
     1327        query("drop table test_child")
     1328        query("drop table test_parent")
     1329
     1330    def testTruncateQuoted(self):
     1331        truncate = self.db.truncate
     1332        query = self.db.query
     1333        table = "test table for truncate()"
     1334        query('drop table if exists "%s"' % table)
     1335        query('create table "%s" (n smallint)' % table)
     1336        for i in range(3):
     1337            query('insert into "%s" values (1)' % table)
     1338        q = 'select count(*) from "%s"' % table
     1339        r = query(q).getresult()[0][0]
     1340        self.assertEqual(r, 3)
     1341        truncate(table)
     1342        r = query(q).getresult()[0][0]
     1343        self.assertEqual(r, 0)
     1344        for i in range(3):
     1345            query('insert into "%s" values (1)' % table)
     1346        r = query(q).getresult()[0][0]
     1347        self.assertEqual(r, 3)
     1348        truncate('public."%s"' % table)
     1349        r = query(q).getresult()[0][0]
     1350        self.assertEqual(r, 0)
     1351        query('drop table "%s"' % table)
     1352
    11511353    def testTransaction(self):
    11521354        query = self.db.query
Note: See TracChangeset for help on using the changeset viewer.