Changeset 748 for trunk


Ignore:
Timestamp:
Jan 15, 2016, 9:25:31 AM (4 years ago)
Author:
cito
Message:

Add method truncate() to DB wrapper class

This methods can be used to quickly truncate tables.

Since this is pretty useful and will not break anything, I have
also back ported this addition to the 4.x branch.

Everything is well documented and tested, of course.

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/changelog.rst

    r745 r748  
    4949- New methods get_parameters() and set_parameters() in the classic interface
    5050  which can be used to get or set run-time parameters.
     51- New method truncate() in the classic interface that can be used to quickly
     52  empty a table or a set of tables.
    5153- Fix decimal point handling.
    5254- Add option to return boolean values as bool objects.
  • trunk/docs/contents/pg/db_wrapper.rst

    r747 r748  
    477477exist and 1 if the row was deleted).
    478478
     479truncate -- Quickly empty database tables
     480-----------------------------------------
     481
     482.. method:: DB.truncate(self, table, [restart], [cascade], [only]):
     483
     484    Empty a table or set of tables
     485
     486    :param table: the name of the table(s)
     487    :type table: str, list or set
     488    :param bool restart: whether table sequences should be restarted
     489    :param bool cascade: whether referenced tables should also be truncated
     490    :param only: whether only parent tables should be truncated
     491    :type only: bool or list
     492
     493This method quickly removes all rows from the given table or set
     494of tables.  It has the same effect as an unqualified DELETE on each
     495table, but since it does not actually scan the tables it is faster.
     496Furthermore, it reclaims disk space immediately, rather than requiring
     497a subsequent VACUUM operation. This is most useful on large tables.
     498
     499If *restart* is set to `True`, sequences owned by columns of the truncated
     500table(s) are automatically restarted.  If *cascade* is set to `True`, it
     501also truncates all tables that have foreign-key references to any of
     502the named tables.  If the parameter *only* is not set to `True`, all the
     503descendant tables (if any) will also be truncated. Optionally, a ``*``
     504can be specified after the table name to explicitly indicate that
     505descendant tables are included.  If the parameter *table* is a list,
     506the parameter *only* can also be a list of corresponding boolean values.
     507
    479508escape_literal -- escape a literal string for use within SQL
    480509------------------------------------------------------------
  • trunk/pg.py

    r747 r748  
    578578            if param == 'all':
    579579                if value is not None:
    580                     raise ValueError(
    581                         "A value must ot be set when parameter is 'all'")
     580                    raise ValueError('A value must ot be specified'
     581                        " when parameter is 'all'")
    582582                params = {'all': None}
    583583                break
     
    10881088        return int(res)
    10891089
     1090    def truncate(self, table, restart=False, cascade=False, only=False):
     1091        """Empty a table or set of tables.
     1092
     1093        This method quickly removes all rows from the given table or set
     1094        of tables.  It has the same effect as an unqualified DELETE on each
     1095        table, but since it does not actually scan the tables it is faster.
     1096        Furthermore, it reclaims disk space immediately, rather than requiring
     1097        a subsequent VACUUM operation. This is most useful on large tables.
     1098
     1099        If restart is set to True, sequences owned by columns of the truncated
     1100        table(s) are automatically restarted.  If cascade is set to True, it
     1101        also truncates all tables that have foreign-key references to any of
     1102        the named tables.  If the parameter only is not set to True, all the
     1103        descendant tables (if any) will also be truncated. Optionally, a '*'
     1104        can be specified after the table name to explicitly indicate that
     1105        descendant tables are included.
     1106        """
     1107        if isinstance(table, basestring):
     1108            only = {table: only}
     1109            table = [table]
     1110        elif isinstance(table, (list, tuple)):
     1111            if isinstance(only, (list, tuple)):
     1112                only = dict(zip(table, only))
     1113            else:
     1114                only = dict.fromkeys(table, only)
     1115        elif isinstance(table, (set, frozenset)):
     1116            only = dict.fromkeys(table, only)
     1117        else:
     1118            raise TypeError('The table must be a string, list or set')
     1119        if not (restart is None or isinstance(restart, (bool, int))):
     1120            raise TypeError('Invalid type for the restart option')
     1121        if not (cascade is None or isinstance(cascade, (bool, int))):
     1122            raise TypeError('Invalid type for the cascade option')
     1123        tables = []
     1124        for t in table:
     1125            u = only.get(t)
     1126            if not (u is None or isinstance(u, (bool, int))):
     1127                raise TypeError('Invalid type for the only option')
     1128            if t.endswith('*'):
     1129                if u:
     1130                    raise ValueError(
     1131                        'Contradictory table name and only options')
     1132                t = t[:-1].rstrip()
     1133            t = self._escape_qualified_name(t)
     1134            if u:
     1135                t = 'ONLY %s' % t
     1136            tables.append(t)
     1137        q = ['TRUNCATE', ', '.join(tables)]
     1138        if restart:
     1139            q.append('RESTART IDENTITY')
     1140        if cascade:
     1141            q.append('CASCADE')
     1142        q = ' '.join(q)
     1143        self._do_debug(q)
     1144        return self.query(q)
     1145
    10901146    def notification_handler(self, event, callback, arg_dict={}, timeout=None):
    10911147        """Get notification handler that will run the given callback."""
  • trunk/tests/test_classic_dbwrapper.py

    r747 r748  
    108108            'set_notice_receiver', 'set_parameter',
    109109            'source', 'start', 'status',
    110             'transaction',
     110            'transaction', 'truncate',
    111111            'unescape_bytea', 'update', 'upsert',
    112112            'use_regtypes', 'user',
     
    15151515        self.assertEqual(r[0][0], 0)
    15161516
     1517    def testTruncate(self):
     1518        truncate = self.db.truncate
     1519        self.assertRaises(TypeError, truncate, None)
     1520        self.assertRaises(TypeError, truncate, 42)
     1521        self.assertRaises(TypeError, truncate, dict(test_table=None))
     1522        query = self.db.query
     1523        query("drop table if exists test_table")
     1524        self.addCleanup(query, "drop table test_table")
     1525        query("create table test_table (n smallint)")
     1526        for i in range(3):
     1527            query("insert into test_table values (1)")
     1528        q = "select count(*) from test_table"
     1529        r = query(q).getresult()[0][0]
     1530        self.assertEqual(r, 3)
     1531        truncate('test_table')
     1532        r = query(q).getresult()[0][0]
     1533        self.assertEqual(r, 0)
     1534        for i in range(3):
     1535            query("insert into test_table values (1)")
     1536        r = query(q).getresult()[0][0]
     1537        self.assertEqual(r, 3)
     1538        truncate('public.test_table')
     1539        r = query(q).getresult()[0][0]
     1540        self.assertEqual(r, 0)
     1541        query("drop table if exists test_table_2")
     1542        self.addCleanup(query, "drop table test_table_2")
     1543        query('create table test_table_2 (n smallint)')
     1544        for t in (list, tuple, set):
     1545            for i in range(3):
     1546                query("insert into test_table values (1)")
     1547                query("insert into test_table_2 values (2)")
     1548            q = ("select (select count(*) from test_table),"
     1549                " (select count(*) from test_table_2)")
     1550            r = query(q).getresult()[0]
     1551            self.assertEqual(r, (3, 3))
     1552            truncate(t(['test_table', 'test_table_2']))
     1553            r = query(q).getresult()[0]
     1554            self.assertEqual(r, (0, 0))
     1555
     1556    def testTruncateRestart(self):
     1557        truncate = self.db.truncate
     1558        self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
     1559        query = self.db.query
     1560        query("drop table if exists test_table")
     1561        self.addCleanup(query, "drop table test_table")
     1562        query("create table test_table (n serial, t text)")
     1563        for n in range(3):
     1564            query("insert into test_table (t) values ('test')")
     1565        q = "select count(n), min(n), max(n) from test_table"
     1566        r = query(q).getresult()[0]
     1567        self.assertEqual(r, (3, 1, 3))
     1568        truncate('test_table')
     1569        r = query(q).getresult()[0]
     1570        self.assertEqual(r, (0, None, None))
     1571        for n in range(3):
     1572            query("insert into test_table (t) values ('test')")
     1573        r = query(q).getresult()[0]
     1574        self.assertEqual(r, (3, 4, 6))
     1575        truncate('test_table', restart=True)
     1576        r = query(q).getresult()[0]
     1577        self.assertEqual(r, (0, None, None))
     1578        for n in range(3):
     1579            query("insert into test_table (t) values ('test')")
     1580        r = query(q).getresult()[0]
     1581        self.assertEqual(r, (3, 1, 3))
     1582
     1583    def testTruncateCascade(self):
     1584        truncate = self.db.truncate
     1585        self.assertRaises(TypeError, truncate, 'test_table', cascade='invalid')
     1586        query = self.db.query
     1587        query("drop table if exists test_child")
     1588        query("drop table if exists test_parent")
     1589        self.addCleanup(query, "drop table test_parent")
     1590        query("create table test_parent (n smallint primary key)")
     1591        self.addCleanup(query, "drop table test_child")
     1592        query("create table test_child ("
     1593            " n smallint primary key references test_parent (n))")
     1594        for n in range(3):
     1595            query("insert into test_parent (n) values (%d)" % n)
     1596            query("insert into test_child (n) values (%d)" % n)
     1597        q = ("select (select count(*) from test_parent),"
     1598            " (select count(*) from test_child)")
     1599        r = query(q).getresult()[0]
     1600        self.assertEqual(r, (3, 3))
     1601        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1602        truncate(['test_parent', 'test_child'])
     1603        r = query(q).getresult()[0]
     1604        self.assertEqual(r, (0, 0))
     1605        for n in range(3):
     1606            query("insert into test_parent (n) values (%d)" % n)
     1607            query("insert into test_child (n) values (%d)" % n)
     1608        r = query(q).getresult()[0]
     1609        self.assertEqual(r, (3, 3))
     1610        truncate('test_parent', cascade=True)
     1611        r = query(q).getresult()[0]
     1612        self.assertEqual(r, (0, 0))
     1613        for n in range(3):
     1614            query("insert into test_parent (n) values (%d)" % n)
     1615            query("insert into test_child (n) values (%d)" % n)
     1616        r = query(q).getresult()[0]
     1617        self.assertEqual(r, (3, 3))
     1618        truncate('test_child')
     1619        r = query(q).getresult()[0]
     1620        self.assertEqual(r, (3, 0))
     1621        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1622        truncate('test_parent', cascade=True)
     1623        r = query(q).getresult()[0]
     1624        self.assertEqual(r, (0, 0))
     1625
     1626    def testTruncateOnly(self):
     1627        truncate = self.db.truncate
     1628        self.assertRaises(TypeError, truncate, 'test_table', only='invalid')
     1629        query = self.db.query
     1630        query("drop table if exists test_child")
     1631        query("drop table if exists test_parent")
     1632        self.addCleanup(query, "drop table test_parent")
     1633        query("create table test_parent (n smallint)")
     1634        self.addCleanup(query, "drop table test_child")
     1635        query("create table test_child ("
     1636            " m smallint) inherits (test_parent)")
     1637        for n in range(3):
     1638            query("insert into test_parent (n) values (1)")
     1639            query("insert into test_child (n, m) values (2, 3)")
     1640        q = ("select (select count(*) from test_parent),"
     1641            " (select count(*) from test_child)")
     1642        r = query(q).getresult()[0]
     1643        self.assertEqual(r, (6, 3))
     1644        truncate('test_parent')
     1645        r = query(q).getresult()[0]
     1646        self.assertEqual(r, (0, 0))
     1647        for n in range(3):
     1648            query("insert into test_parent (n) values (1)")
     1649            query("insert into test_child (n, m) values (2, 3)")
     1650        r = query(q).getresult()[0]
     1651        self.assertEqual(r, (6, 3))
     1652        truncate('test_parent*')
     1653        r = query(q).getresult()[0]
     1654        self.assertEqual(r, (0, 0))
     1655        for n in range(3):
     1656            query("insert into test_parent (n) values (1)")
     1657            query("insert into test_child (n, m) values (2, 3)")
     1658        r = query(q).getresult()[0]
     1659        self.assertEqual(r, (6, 3))
     1660        truncate('test_parent', only=True)
     1661        r = query(q).getresult()[0]
     1662        self.assertEqual(r, (3, 3))
     1663        truncate('test_parent', only=False)
     1664        r = query(q).getresult()[0]
     1665        self.assertEqual(r, (0, 0))
     1666        self.assertRaises(ValueError, truncate, 'test_parent*', only=True)
     1667        truncate('test_parent*', only=False)
     1668        query("drop table if exists test_parent_2")
     1669        self.addCleanup(query, "drop table test_parent_2")
     1670        query("create table test_parent_2 (n smallint)")
     1671        query("drop table if exists test_child_2")
     1672        self.addCleanup(query, "drop table test_child_2")
     1673        query("create table test_child_2 ("
     1674            " m smallint) inherits (test_parent_2)")
     1675        for n in range(3):
     1676            query("insert into test_parent (n) values (1)")
     1677            query("insert into test_child (n, m) values (2, 3)")
     1678            query("insert into test_parent_2 (n) values (1)")
     1679            query("insert into test_child_2 (n, m) values (2, 3)")
     1680        q = ("select (select count(*) from test_parent),"
     1681            " (select count(*) from test_child),"
     1682            " (select count(*) from test_parent_2),"
     1683            " (select count(*) from test_child_2)")
     1684        r = query(q).getresult()[0]
     1685        self.assertEqual(r, (6, 3, 6, 3))
     1686        truncate(['test_parent', 'test_parent_2'], only=[False, True])
     1687        r = query(q).getresult()[0]
     1688        self.assertEqual(r, (0, 0, 3, 3))
     1689        truncate(['test_parent', 'test_parent_2'], only=False)
     1690        r = query(q).getresult()[0]
     1691        self.assertEqual(r, (0, 0, 0, 0))
     1692        self.assertRaises(ValueError, truncate,
     1693            ['test_parent*', 'test_child'], only=[True, False])
     1694        truncate(['test_parent*', 'test_child'], only=[False, True])
     1695
     1696    def testTruncateQuoted(self):
     1697        truncate = self.db.truncate
     1698        query = self.db.query
     1699        table = "test table for truncate()"
     1700        query('drop table if exists "%s"' % table)
     1701        self.addCleanup(query, 'drop table "%s"' % table)
     1702        query('create table "%s" (n smallint)' % table)
     1703        for i in range(3):
     1704            query('insert into "%s" values (1)' % table)
     1705        q = 'select count(*) from "%s"' % table
     1706        r = query(q).getresult()[0][0]
     1707        self.assertEqual(r, 3)
     1708        truncate(table)
     1709        r = query(q).getresult()[0][0]
     1710        self.assertEqual(r, 0)
     1711        for i in range(3):
     1712            query('insert into "%s" values (1)' % table)
     1713        r = query(q).getresult()[0][0]
     1714        self.assertEqual(r, 3)
     1715        truncate('public."%s"' % table)
     1716        r = query(q).getresult()[0][0]
     1717        self.assertEqual(r, 0)
     1718
    15171719    def testTransaction(self):
    15181720        query = self.db.query
Note: See TracChangeset for help on using the changeset viewer.