Changeset 748


Ignore:
Timestamp:
Jan 15, 2016, 9:25:31 AM (4 years ago)
Author:
cito
Message:

Add method truncate() to DB wrapper class

This methods can be used to quickly truncate tables.

Since this is pretty useful and will not break anything, I have
also back ported this addition to the 4.x branch.

Everything is well documented and tested, of course.

Files:
8 edited

Legend:

Unmodified
Added
Removed
  • branches/4.x/docs/contents/changelog.rst

    r745 r748  
    1010- New methods get_parameters() and set_parameters() in the classic interface
    1111  which can be used to get or set run-time parameters.
     12- New method truncate() in the classic interface that can be used to quickly
     13  empty a table or a set of tables.
    1214- Fix decimal point handling.
    1315- Add option to return boolean values as bool objects.
  • branches/4.x/docs/contents/pg/db_wrapper.rst

    r747 r748  
    416416exist and 1 if the row was deleted).
    417417
     418truncate -- Quickly empty database tables
     419-----------------------------------------
     420
     421.. method:: DB.truncate(self, table, [restart], [cascade], [only]):
     422
     423    Empty a table or set of tables
     424
     425    :param table: the name of the table(s)
     426    :type table: str, list or set
     427    :param bool restart: whether table sequences should be restarted
     428    :param bool cascade: whether referenced tables should also be truncated
     429    :param only: whether only parent tables should be truncated
     430    :type only: bool or list
     431
     432This method quickly removes all rows from the given table or set
     433of tables.  It has the same effect as an unqualified DELETE on each
     434table, but since it does not actually scan the tables it is faster.
     435Furthermore, it reclaims disk space immediately, rather than requiring
     436a subsequent VACUUM operation. This is most useful on large tables.
     437
     438If *restart* is set to `True`, sequences owned by columns of the truncated
     439table(s) are automatically restarted.  If *cascade* is set to `True`, it
     440also truncates all tables that have foreign-key references to any of
     441the named tables.  If the parameter *only* is not set to `True`, all the
     442descendant tables (if any) will also be truncated. Optionally, a ``*``
     443can be specified after the table name to explicitly indicate that
     444descendant tables are included.  If the parameter *table* is a list,
     445the parameter *only* can also be a list of corresponding boolean values.
     446
    418447escape_literal -- escape a literal string for use within SQL
    419448------------------------------------------------------------
  • branches/4.x/pg.py

    r747 r748  
    623623        elif isinstance(parameter, dict):
    624624            if value is not None:
    625                 raise ValueError(
    626                     'A value must not be set when parameter is a dictionary')
     625                raise ValueError('A value must not be specified'
     626                    ' when parameter is a dictionary')
    627627        else:
    628628            raise TypeError(
     
    640640            if param == 'all':
    641641                if value is not None:
    642                     raise ValueError(
    643                         "A value must ot be set when parameter is 'all'")
     642                    raise ValueError('A value must ot be specified'
     643                        " when parameter is 'all'")
    644644                params = {'all': None}
    645645                break
     
    11001100        return int(self.db.query(q))
    11011101
     1102    def truncate(self, table, restart=False, cascade=False, only=False):
     1103        """Empty a table or set of tables.
     1104
     1105        This method quickly removes all rows from the given table or set
     1106        of tables.  It has the same effect as an unqualified DELETE on each
     1107        table, but since it does not actually scan the tables it is faster.
     1108        Furthermore, it reclaims disk space immediately, rather than requiring
     1109        a subsequent VACUUM operation. This is most useful on large tables.
     1110
     1111        If restart is set to True, sequences owned by columns of the truncated
     1112        table(s) are automatically restarted.  If cascade is set to True, it
     1113        also truncates all tables that have foreign-key references to any of
     1114        the named tables.  If the parameter only is not set to True, all the
     1115        descendant tables (if any) will also be truncated. Optionally, a '*'
     1116        can be specified after the table name to explicitly indicate that
     1117        descendant tables are included.
     1118        """
     1119        if isinstance(table, basestring):
     1120            only = {table: only}
     1121            table = [table]
     1122        elif isinstance(table, (list, tuple)):
     1123            if isinstance(only, (list, tuple)):
     1124                only = dict(zip(table, only))
     1125            else:
     1126                only = dict.fromkeys(table, only)
     1127        elif isinstance(table, (set, frozenset)):
     1128            only = dict.fromkeys(table, only)
     1129        else:
     1130            raise TypeError('The table must be a string, list or set')
     1131        if not (restart is None or isinstance(restart, (bool, int))):
     1132            raise TypeError('Invalid type for the restart option')
     1133        if not (cascade is None or isinstance(cascade, (bool, int))):
     1134            raise TypeError('Invalid type for the cascade option')
     1135        tables = []
     1136        for t in table:
     1137            u = only.get(t)
     1138            if not (u is None or isinstance(u, (bool, int))):
     1139                raise TypeError('Invalid type for the only option')
     1140            if t.endswith('*'):
     1141                if u:
     1142                    raise ValueError(
     1143                        'Contradictory table name and only options')
     1144                t = t[:-1].rstrip()
     1145            t = self._add_schema(t)
     1146            if u:
     1147                t = 'ONLY %s' % t
     1148            tables.append(t)
     1149        q = ['TRUNCATE', ', '.join(tables)]
     1150        if restart:
     1151            q.append('RESTART IDENTITY')
     1152        if cascade:
     1153            q.append('CASCADE')
     1154        q = ' '.join(q)
     1155        self._do_debug(q)
     1156        return self.query(q)
     1157
    11021158    def notification_handler(self, event, callback, arg_dict={}, timeout=None):
    11031159        """Get notification handler that will run the given callback."""
  • branches/4.x/tests/test_classic_dbwrapper.py

    r747 r748  
    9595            'set_notice_receiver', 'set_parameter',
    9696            'source', 'start', 'status',
    97             'transaction', 'tty',
     97            'transaction', 'truncate', 'tty',
    9898            'unescape_bytea', 'update',
    9999            'use_regtypes', 'user',
     
    11491149        query("drop table %s" % table)
    11501150
     1151    def testTruncate(self):
     1152        truncate = self.db.truncate
     1153        self.assertRaises(TypeError, truncate, None)
     1154        self.assertRaises(TypeError, truncate, 42)
     1155        self.assertRaises(TypeError, truncate, dict(test_table=None))
     1156        query = self.db.query
     1157        query("drop table if exists test_table")
     1158        query("create table test_table (n smallint)")
     1159        for i in range(3):
     1160            query("insert into test_table values (1)")
     1161        q = "select count(*) from test_table"
     1162        r = query(q).getresult()[0][0]
     1163        self.assertEqual(r, 3)
     1164        truncate('test_table')
     1165        r = query(q).getresult()[0][0]
     1166        self.assertEqual(r, 0)
     1167        for i in range(3):
     1168            query("insert into test_table values (1)")
     1169        r = query(q).getresult()[0][0]
     1170        self.assertEqual(r, 3)
     1171        truncate('public.test_table')
     1172        r = query(q).getresult()[0][0]
     1173        self.assertEqual(r, 0)
     1174        query("drop table if exists test_table_2")
     1175        query('create table test_table_2 (n smallint)')
     1176        for t in (list, tuple, set):
     1177            for i in range(3):
     1178                query("insert into test_table values (1)")
     1179                query("insert into test_table_2 values (2)")
     1180            q = ("select (select count(*) from test_table),"
     1181                " (select count(*) from test_table_2)")
     1182            r = query(q).getresult()[0]
     1183            self.assertEqual(r, (3, 3))
     1184            truncate(t(['test_table', 'test_table_2']))
     1185            r = query(q).getresult()[0]
     1186            self.assertEqual(r, (0, 0))
     1187        query("drop table test_table_2")
     1188        query("drop table test_table")
     1189
     1190    def testTruncateRestart(self):
     1191        truncate = self.db.truncate
     1192        self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
     1193        query = self.db.query
     1194        query("drop table if exists test_table")
     1195        query("create table test_table (n serial, t text)")
     1196        for n in range(3):
     1197            query("insert into test_table (t) values ('test')")
     1198        q = "select count(n), min(n), max(n) from test_table"
     1199        r = query(q).getresult()[0]
     1200        self.assertEqual(r, (3, 1, 3))
     1201        truncate('test_table')
     1202        r = query(q).getresult()[0]
     1203        self.assertEqual(r, (0, None, None))
     1204        for n in range(3):
     1205            query("insert into test_table (t) values ('test')")
     1206        r = query(q).getresult()[0]
     1207        self.assertEqual(r, (3, 4, 6))
     1208        truncate('test_table', restart=True)
     1209        r = query(q).getresult()[0]
     1210        self.assertEqual(r, (0, None, None))
     1211        for n in range(3):
     1212            query("insert into test_table (t) values ('test')")
     1213        r = query(q).getresult()[0]
     1214        self.assertEqual(r, (3, 1, 3))
     1215        query("drop table test_table")
     1216
     1217    def testTruncateCascade(self):
     1218        truncate = self.db.truncate
     1219        self.assertRaises(TypeError, truncate, 'test_table', cascade='invalid')
     1220        query = self.db.query
     1221        query("drop table if exists test_child")
     1222        query("drop table if exists test_parent")
     1223        query("create table test_parent (n smallint primary key)")
     1224        query("create table test_child ("
     1225            " n smallint primary key references test_parent (n))")
     1226        for n in range(3):
     1227            query("insert into test_parent (n) values (%d)" % n)
     1228            query("insert into test_child (n) values (%d)" % n)
     1229        q = ("select (select count(*) from test_parent),"
     1230            " (select count(*) from test_child)")
     1231        r = query(q).getresult()[0]
     1232        self.assertEqual(r, (3, 3))
     1233        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1234        truncate(['test_parent', 'test_child'])
     1235        r = query(q).getresult()[0]
     1236        self.assertEqual(r, (0, 0))
     1237        for n in range(3):
     1238            query("insert into test_parent (n) values (%d)" % n)
     1239            query("insert into test_child (n) values (%d)" % n)
     1240        r = query(q).getresult()[0]
     1241        self.assertEqual(r, (3, 3))
     1242        truncate('test_parent', cascade=True)
     1243        r = query(q).getresult()[0]
     1244        self.assertEqual(r, (0, 0))
     1245        for n in range(3):
     1246            query("insert into test_parent (n) values (%d)" % n)
     1247            query("insert into test_child (n) values (%d)" % n)
     1248        r = query(q).getresult()[0]
     1249        self.assertEqual(r, (3, 3))
     1250        truncate('test_child')
     1251        r = query(q).getresult()[0]
     1252        self.assertEqual(r, (3, 0))
     1253        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1254        truncate('test_parent', cascade=True)
     1255        r = query(q).getresult()[0]
     1256        self.assertEqual(r, (0, 0))
     1257        query("drop table test_child")
     1258        query("drop table test_parent")
     1259
     1260    def testTruncateOnly(self):
     1261        truncate = self.db.truncate
     1262        self.assertRaises(TypeError, truncate, 'test_table', only='invalid')
     1263        query = self.db.query
     1264        query("drop table if exists test_child")
     1265        query("drop table if exists test_parent")
     1266        query("create table test_parent (n smallint)")
     1267        query("create table test_child ("
     1268            " m smallint) inherits (test_parent)")
     1269        for n in range(3):
     1270            query("insert into test_parent (n) values (1)")
     1271            query("insert into test_child (n, m) values (2, 3)")
     1272        q = ("select (select count(*) from test_parent),"
     1273            " (select count(*) from test_child)")
     1274        r = query(q).getresult()[0]
     1275        self.assertEqual(r, (6, 3))
     1276        truncate('test_parent')
     1277        r = query(q).getresult()[0]
     1278        self.assertEqual(r, (0, 0))
     1279        for n in range(3):
     1280            query("insert into test_parent (n) values (1)")
     1281            query("insert into test_child (n, m) values (2, 3)")
     1282        r = query(q).getresult()[0]
     1283        self.assertEqual(r, (6, 3))
     1284        truncate('test_parent*')
     1285        r = query(q).getresult()[0]
     1286        self.assertEqual(r, (0, 0))
     1287        for n in range(3):
     1288            query("insert into test_parent (n) values (1)")
     1289            query("insert into test_child (n, m) values (2, 3)")
     1290        r = query(q).getresult()[0]
     1291        self.assertEqual(r, (6, 3))
     1292        truncate('test_parent', only=True)
     1293        r = query(q).getresult()[0]
     1294        self.assertEqual(r, (3, 3))
     1295        truncate('test_parent', only=False)
     1296        r = query(q).getresult()[0]
     1297        self.assertEqual(r, (0, 0))
     1298        self.assertRaises(ValueError, truncate, 'test_parent*', only=True)
     1299        truncate('test_parent*', only=False)
     1300        query("drop table if exists test_parent_2")
     1301        query("create table test_parent_2 (n smallint)")
     1302        query("drop table if exists test_child_2")
     1303        query("create table test_child_2 ("
     1304            " m smallint) inherits (test_parent_2)")
     1305        for n in range(3):
     1306            query("insert into test_parent (n) values (1)")
     1307            query("insert into test_child (n, m) values (2, 3)")
     1308            query("insert into test_parent_2 (n) values (1)")
     1309            query("insert into test_child_2 (n, m) values (2, 3)")
     1310        q = ("select (select count(*) from test_parent),"
     1311            " (select count(*) from test_child),"
     1312            " (select count(*) from test_parent_2),"
     1313            " (select count(*) from test_child_2)")
     1314        r = query(q).getresult()[0]
     1315        self.assertEqual(r, (6, 3, 6, 3))
     1316        truncate(['test_parent', 'test_parent_2'], only=[False, True])
     1317        r = query(q).getresult()[0]
     1318        self.assertEqual(r, (0, 0, 3, 3))
     1319        truncate(['test_parent', 'test_parent_2'], only=False)
     1320        r = query(q).getresult()[0]
     1321        self.assertEqual(r, (0, 0, 0, 0))
     1322        self.assertRaises(ValueError, truncate,
     1323            ['test_parent*', 'test_child'], only=[True, False])
     1324        truncate(['test_parent*', 'test_child'], only=[False, True])
     1325        query("drop table test_child_2")
     1326        query("drop table test_parent_2")
     1327        query("drop table test_child")
     1328        query("drop table test_parent")
     1329
     1330    def testTruncateQuoted(self):
     1331        truncate = self.db.truncate
     1332        query = self.db.query
     1333        table = "test table for truncate()"
     1334        query('drop table if exists "%s"' % table)
     1335        query('create table "%s" (n smallint)' % table)
     1336        for i in range(3):
     1337            query('insert into "%s" values (1)' % table)
     1338        q = 'select count(*) from "%s"' % table
     1339        r = query(q).getresult()[0][0]
     1340        self.assertEqual(r, 3)
     1341        truncate(table)
     1342        r = query(q).getresult()[0][0]
     1343        self.assertEqual(r, 0)
     1344        for i in range(3):
     1345            query('insert into "%s" values (1)' % table)
     1346        r = query(q).getresult()[0][0]
     1347        self.assertEqual(r, 3)
     1348        truncate('public."%s"' % table)
     1349        r = query(q).getresult()[0][0]
     1350        self.assertEqual(r, 0)
     1351        query('drop table "%s"' % table)
     1352
    11511353    def testTransaction(self):
    11521354        query = self.db.query
  • trunk/docs/contents/changelog.rst

    r745 r748  
    4949- New methods get_parameters() and set_parameters() in the classic interface
    5050  which can be used to get or set run-time parameters.
     51- New method truncate() in the classic interface that can be used to quickly
     52  empty a table or a set of tables.
    5153- Fix decimal point handling.
    5254- Add option to return boolean values as bool objects.
  • trunk/docs/contents/pg/db_wrapper.rst

    r747 r748  
    477477exist and 1 if the row was deleted).
    478478
     479truncate -- Quickly empty database tables
     480-----------------------------------------
     481
     482.. method:: DB.truncate(self, table, [restart], [cascade], [only]):
     483
     484    Empty a table or set of tables
     485
     486    :param table: the name of the table(s)
     487    :type table: str, list or set
     488    :param bool restart: whether table sequences should be restarted
     489    :param bool cascade: whether referenced tables should also be truncated
     490    :param only: whether only parent tables should be truncated
     491    :type only: bool or list
     492
     493This method quickly removes all rows from the given table or set
     494of tables.  It has the same effect as an unqualified DELETE on each
     495table, but since it does not actually scan the tables it is faster.
     496Furthermore, it reclaims disk space immediately, rather than requiring
     497a subsequent VACUUM operation. This is most useful on large tables.
     498
     499If *restart* is set to `True`, sequences owned by columns of the truncated
     500table(s) are automatically restarted.  If *cascade* is set to `True`, it
     501also truncates all tables that have foreign-key references to any of
     502the named tables.  If the parameter *only* is not set to `True`, all the
     503descendant tables (if any) will also be truncated. Optionally, a ``*``
     504can be specified after the table name to explicitly indicate that
     505descendant tables are included.  If the parameter *table* is a list,
     506the parameter *only* can also be a list of corresponding boolean values.
     507
    479508escape_literal -- escape a literal string for use within SQL
    480509------------------------------------------------------------
  • trunk/pg.py

    r747 r748  
    578578            if param == 'all':
    579579                if value is not None:
    580                     raise ValueError(
    581                         "A value must ot be set when parameter is 'all'")
     580                    raise ValueError('A value must ot be specified'
     581                        " when parameter is 'all'")
    582582                params = {'all': None}
    583583                break
     
    10881088        return int(res)
    10891089
     1090    def truncate(self, table, restart=False, cascade=False, only=False):
     1091        """Empty a table or set of tables.
     1092
     1093        This method quickly removes all rows from the given table or set
     1094        of tables.  It has the same effect as an unqualified DELETE on each
     1095        table, but since it does not actually scan the tables it is faster.
     1096        Furthermore, it reclaims disk space immediately, rather than requiring
     1097        a subsequent VACUUM operation. This is most useful on large tables.
     1098
     1099        If restart is set to True, sequences owned by columns of the truncated
     1100        table(s) are automatically restarted.  If cascade is set to True, it
     1101        also truncates all tables that have foreign-key references to any of
     1102        the named tables.  If the parameter only is not set to True, all the
     1103        descendant tables (if any) will also be truncated. Optionally, a '*'
     1104        can be specified after the table name to explicitly indicate that
     1105        descendant tables are included.
     1106        """
     1107        if isinstance(table, basestring):
     1108            only = {table: only}
     1109            table = [table]
     1110        elif isinstance(table, (list, tuple)):
     1111            if isinstance(only, (list, tuple)):
     1112                only = dict(zip(table, only))
     1113            else:
     1114                only = dict.fromkeys(table, only)
     1115        elif isinstance(table, (set, frozenset)):
     1116            only = dict.fromkeys(table, only)
     1117        else:
     1118            raise TypeError('The table must be a string, list or set')
     1119        if not (restart is None or isinstance(restart, (bool, int))):
     1120            raise TypeError('Invalid type for the restart option')
     1121        if not (cascade is None or isinstance(cascade, (bool, int))):
     1122            raise TypeError('Invalid type for the cascade option')
     1123        tables = []
     1124        for t in table:
     1125            u = only.get(t)
     1126            if not (u is None or isinstance(u, (bool, int))):
     1127                raise TypeError('Invalid type for the only option')
     1128            if t.endswith('*'):
     1129                if u:
     1130                    raise ValueError(
     1131                        'Contradictory table name and only options')
     1132                t = t[:-1].rstrip()
     1133            t = self._escape_qualified_name(t)
     1134            if u:
     1135                t = 'ONLY %s' % t
     1136            tables.append(t)
     1137        q = ['TRUNCATE', ', '.join(tables)]
     1138        if restart:
     1139            q.append('RESTART IDENTITY')
     1140        if cascade:
     1141            q.append('CASCADE')
     1142        q = ' '.join(q)
     1143        self._do_debug(q)
     1144        return self.query(q)
     1145
    10901146    def notification_handler(self, event, callback, arg_dict={}, timeout=None):
    10911147        """Get notification handler that will run the given callback."""
  • trunk/tests/test_classic_dbwrapper.py

    r747 r748  
    108108            'set_notice_receiver', 'set_parameter',
    109109            'source', 'start', 'status',
    110             'transaction',
     110            'transaction', 'truncate',
    111111            'unescape_bytea', 'update', 'upsert',
    112112            'use_regtypes', 'user',
     
    15151515        self.assertEqual(r[0][0], 0)
    15161516
     1517    def testTruncate(self):
     1518        truncate = self.db.truncate
     1519        self.assertRaises(TypeError, truncate, None)
     1520        self.assertRaises(TypeError, truncate, 42)
     1521        self.assertRaises(TypeError, truncate, dict(test_table=None))
     1522        query = self.db.query
     1523        query("drop table if exists test_table")
     1524        self.addCleanup(query, "drop table test_table")
     1525        query("create table test_table (n smallint)")
     1526        for i in range(3):
     1527            query("insert into test_table values (1)")
     1528        q = "select count(*) from test_table"
     1529        r = query(q).getresult()[0][0]
     1530        self.assertEqual(r, 3)
     1531        truncate('test_table')
     1532        r = query(q).getresult()[0][0]
     1533        self.assertEqual(r, 0)
     1534        for i in range(3):
     1535            query("insert into test_table values (1)")
     1536        r = query(q).getresult()[0][0]
     1537        self.assertEqual(r, 3)
     1538        truncate('public.test_table')
     1539        r = query(q).getresult()[0][0]
     1540        self.assertEqual(r, 0)
     1541        query("drop table if exists test_table_2")
     1542        self.addCleanup(query, "drop table test_table_2")
     1543        query('create table test_table_2 (n smallint)')
     1544        for t in (list, tuple, set):
     1545            for i in range(3):
     1546                query("insert into test_table values (1)")
     1547                query("insert into test_table_2 values (2)")
     1548            q = ("select (select count(*) from test_table),"
     1549                " (select count(*) from test_table_2)")
     1550            r = query(q).getresult()[0]
     1551            self.assertEqual(r, (3, 3))
     1552            truncate(t(['test_table', 'test_table_2']))
     1553            r = query(q).getresult()[0]
     1554            self.assertEqual(r, (0, 0))
     1555
     1556    def testTruncateRestart(self):
     1557        truncate = self.db.truncate
     1558        self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
     1559        query = self.db.query
     1560        query("drop table if exists test_table")
     1561        self.addCleanup(query, "drop table test_table")
     1562        query("create table test_table (n serial, t text)")
     1563        for n in range(3):
     1564            query("insert into test_table (t) values ('test')")
     1565        q = "select count(n), min(n), max(n) from test_table"
     1566        r = query(q).getresult()[0]
     1567        self.assertEqual(r, (3, 1, 3))
     1568        truncate('test_table')
     1569        r = query(q).getresult()[0]
     1570        self.assertEqual(r, (0, None, None))
     1571        for n in range(3):
     1572            query("insert into test_table (t) values ('test')")
     1573        r = query(q).getresult()[0]
     1574        self.assertEqual(r, (3, 4, 6))
     1575        truncate('test_table', restart=True)
     1576        r = query(q).getresult()[0]
     1577        self.assertEqual(r, (0, None, None))
     1578        for n in range(3):
     1579            query("insert into test_table (t) values ('test')")
     1580        r = query(q).getresult()[0]
     1581        self.assertEqual(r, (3, 1, 3))
     1582
     1583    def testTruncateCascade(self):
     1584        truncate = self.db.truncate
     1585        self.assertRaises(TypeError, truncate, 'test_table', cascade='invalid')
     1586        query = self.db.query
     1587        query("drop table if exists test_child")
     1588        query("drop table if exists test_parent")
     1589        self.addCleanup(query, "drop table test_parent")
     1590        query("create table test_parent (n smallint primary key)")
     1591        self.addCleanup(query, "drop table test_child")
     1592        query("create table test_child ("
     1593            " n smallint primary key references test_parent (n))")
     1594        for n in range(3):
     1595            query("insert into test_parent (n) values (%d)" % n)
     1596            query("insert into test_child (n) values (%d)" % n)
     1597        q = ("select (select count(*) from test_parent),"
     1598            " (select count(*) from test_child)")
     1599        r = query(q).getresult()[0]
     1600        self.assertEqual(r, (3, 3))
     1601        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1602        truncate(['test_parent', 'test_child'])
     1603        r = query(q).getresult()[0]
     1604        self.assertEqual(r, (0, 0))
     1605        for n in range(3):
     1606            query("insert into test_parent (n) values (%d)" % n)
     1607            query("insert into test_child (n) values (%d)" % n)
     1608        r = query(q).getresult()[0]
     1609        self.assertEqual(r, (3, 3))
     1610        truncate('test_parent', cascade=True)
     1611        r = query(q).getresult()[0]
     1612        self.assertEqual(r, (0, 0))
     1613        for n in range(3):
     1614            query("insert into test_parent (n) values (%d)" % n)
     1615            query("insert into test_child (n) values (%d)" % n)
     1616        r = query(q).getresult()[0]
     1617        self.assertEqual(r, (3, 3))
     1618        truncate('test_child')
     1619        r = query(q).getresult()[0]
     1620        self.assertEqual(r, (3, 0))
     1621        self.assertRaises(pg.ProgrammingError, truncate, 'test_parent')
     1622        truncate('test_parent', cascade=True)
     1623        r = query(q).getresult()[0]
     1624        self.assertEqual(r, (0, 0))
     1625
     1626    def testTruncateOnly(self):
     1627        truncate = self.db.truncate
     1628        self.assertRaises(TypeError, truncate, 'test_table', only='invalid')
     1629        query = self.db.query
     1630        query("drop table if exists test_child")
     1631        query("drop table if exists test_parent")
     1632        self.addCleanup(query, "drop table test_parent")
     1633        query("create table test_parent (n smallint)")
     1634        self.addCleanup(query, "drop table test_child")
     1635        query("create table test_child ("
     1636            " m smallint) inherits (test_parent)")
     1637        for n in range(3):
     1638            query("insert into test_parent (n) values (1)")
     1639            query("insert into test_child (n, m) values (2, 3)")
     1640        q = ("select (select count(*) from test_parent),"
     1641            " (select count(*) from test_child)")
     1642        r = query(q).getresult()[0]
     1643        self.assertEqual(r, (6, 3))
     1644        truncate('test_parent')
     1645        r = query(q).getresult()[0]
     1646        self.assertEqual(r, (0, 0))
     1647        for n in range(3):
     1648            query("insert into test_parent (n) values (1)")
     1649            query("insert into test_child (n, m) values (2, 3)")
     1650        r = query(q).getresult()[0]
     1651        self.assertEqual(r, (6, 3))
     1652        truncate('test_parent*')
     1653        r = query(q).getresult()[0]
     1654        self.assertEqual(r, (0, 0))
     1655        for n in range(3):
     1656            query("insert into test_parent (n) values (1)")
     1657            query("insert into test_child (n, m) values (2, 3)")
     1658        r = query(q).getresult()[0]
     1659        self.assertEqual(r, (6, 3))
     1660        truncate('test_parent', only=True)
     1661        r = query(q).getresult()[0]
     1662        self.assertEqual(r, (3, 3))
     1663        truncate('test_parent', only=False)
     1664        r = query(q).getresult()[0]
     1665        self.assertEqual(r, (0, 0))
     1666        self.assertRaises(ValueError, truncate, 'test_parent*', only=True)
     1667        truncate('test_parent*', only=False)
     1668        query("drop table if exists test_parent_2")
     1669        self.addCleanup(query, "drop table test_parent_2")
     1670        query("create table test_parent_2 (n smallint)")
     1671        query("drop table if exists test_child_2")
     1672        self.addCleanup(query, "drop table test_child_2")
     1673        query("create table test_child_2 ("
     1674            " m smallint) inherits (test_parent_2)")
     1675        for n in range(3):
     1676            query("insert into test_parent (n) values (1)")
     1677            query("insert into test_child (n, m) values (2, 3)")
     1678            query("insert into test_parent_2 (n) values (1)")
     1679            query("insert into test_child_2 (n, m) values (2, 3)")
     1680        q = ("select (select count(*) from test_parent),"
     1681            " (select count(*) from test_child),"
     1682            " (select count(*) from test_parent_2),"
     1683            " (select count(*) from test_child_2)")
     1684        r = query(q).getresult()[0]
     1685        self.assertEqual(r, (6, 3, 6, 3))
     1686        truncate(['test_parent', 'test_parent_2'], only=[False, True])
     1687        r = query(q).getresult()[0]
     1688        self.assertEqual(r, (0, 0, 3, 3))
     1689        truncate(['test_parent', 'test_parent_2'], only=False)
     1690        r = query(q).getresult()[0]
     1691        self.assertEqual(r, (0, 0, 0, 0))
     1692        self.assertRaises(ValueError, truncate,
     1693            ['test_parent*', 'test_child'], only=[True, False])
     1694        truncate(['test_parent*', 'test_child'], only=[False, True])
     1695
     1696    def testTruncateQuoted(self):
     1697        truncate = self.db.truncate
     1698        query = self.db.query
     1699        table = "test table for truncate()"
     1700        query('drop table if exists "%s"' % table)
     1701        self.addCleanup(query, 'drop table "%s"' % table)
     1702        query('create table "%s" (n smallint)' % table)
     1703        for i in range(3):
     1704            query('insert into "%s" values (1)' % table)
     1705        q = 'select count(*) from "%s"' % table
     1706        r = query(q).getresult()[0][0]
     1707        self.assertEqual(r, 3)
     1708        truncate(table)
     1709        r = query(q).getresult()[0][0]
     1710        self.assertEqual(r, 0)
     1711        for i in range(3):
     1712            query('insert into "%s" values (1)' % table)
     1713        r = query(q).getresult()[0][0]
     1714        self.assertEqual(r, 3)
     1715        truncate('public."%s"' % table)
     1716        r = query(q).getresult()[0][0]
     1717        self.assertEqual(r, 0)
     1718
    15171719    def testTransaction(self):
    15181720        query = self.db.query
Note: See TracChangeset for help on using the changeset viewer.