Changeset 770 for trunk


Ignore:
Timestamp:
Jan 20, 2016, 1:19:45 PM (4 years ago)
Author:
cito
Message:

Add methods for getting a table as a list or dict

Also added documentation and 100% test coverage.

The get_attnames() method now always returns a read-only ordered dictionary,
even under Python 2.6 or 3.0. So you can sure the columns will be returned
in the right order if you iterate over it, and that you don't accidentally
modify the dictionary (since it is cached).

Location:
trunk
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/changelog.rst

    r765 r770  
    4343  DB wrapper methods has been reduced and security has been improved by
    4444  passing the values to libpq separately as parameters instead of inline.
     45- The classic interface got two new methods get_as_list() and get_as_dict()
     46  returning a database table as a Python list or dict. The amount of data
     47  returned can be controlled with various parameters.
    4548
    4649Version 4.2
  • trunk/docs/contents/pg/db_wrapper.rst

    r769 r770  
    120120
    121121    :param str table: name of table
    122     :returns: a dictionary mapping attribute names to type names
     122    :returns: an ordered dictionary mapping attribute names to type names
    123123
    124124Given the name of a table, digs out the set of attribute names.
    125125
    126 Returns a dictionary of attribute names (the names are the keys,
    127 the values are the names of the attributes' types).
    128 
    129 If your Python version supports this, the dictionary will be an
    130 OrderedDictionary with the column names in the right order.
     126Returns a read-only dictionary of attribute names (the names are the keys,
     127the values are the names of the attributes' types) with the column names
     128in the proper order if you iterate over it.
    131129
    132130By default, only a limited number of simple types will be returned.
     
    162160    :rtype: str, list or dict
    163161    :raises TypeError: Invalid parameter type(s)
    164     :raises ProgrammingError: Invalid parameter name(s)
     162    :raises pg.ProgrammingError: Invalid parameter name(s)
    165163
    166164If the parameter is a string, the return value will also be a string
     
    180178.. versionadded:: 4.2
    181179
    182 .. method:: DB.set_parameter(self, parameter, [value], [local])
     180.. method:: DB.set_parameter(parameter, [value], [local])
    183181
    184182    Set the value of run-time parameters
     
    190188    :raises TypeError: Invalid parameter type(s)
    191189    :raises ValueError: Invalid value argument(s)
    192     :raises ProgrammingError: Invalid parameter name(s) or values
     190    :raises pg.ProgrammingError: Invalid parameter name(s) or values
    193191
    194192If the parameter and the value are strings, the run-time parameter
     
    295293    :returns: A dictionary - the keys are the attribute names,
    296294      the values are the row values.
    297     :raises ProgrammingError: table has no primary key or missing privilege
     295    :raises pg.ProgrammingError: table has no primary key or missing privilege
    298296    :raises KeyError: missing key value for the row
    299297
     
    325323    :returns: the inserted values in the database
    326324    :rtype: dict
    327     :raises ProgrammingError: missing privilege or conflict
     325    :raises pg.ProgrammingError: missing privilege or conflict
    328326
    329327This method inserts a row into a table.  If the optional dictionary is
     
    347345    :returns: the new row in the database
    348346    :rtype: dict
    349     :raises ProgrammingError: table has no primary key or missing privilege
     347    :raises pg.ProgrammingError: table has no primary key or missing privilege
    350348    :raises KeyError: missing key value for the row
    351349
     
    374372    :returns: the new row in the database
    375373    :rtype: dict
    376     :raises ProgrammingError: table has no primary key or missing privilege
     374    :raises pg.ProgrammingError: table has no primary key or missing privilege
    377375
    378376This method inserts a row into a table, but instead of raising a
     
    480478    :param col: optional keyword arguments for updating the dictionary
    481479    :rtype: None
    482     :raises ProgrammingError: table has no primary key,
     480    :raises pg.ProgrammingError: table has no primary key,
    483481        row is still referenced or missing privilege
    484482    :raises KeyError: missing key value for the row
     
    494492by another table, this method will raise a ProgrammingError.
    495493
    496 truncate -- Quickly empty database tables
     494truncate -- quickly empty database tables
    497495-----------------------------------------
    498496
    499 .. method:: DB.truncate(self, table, [restart], [cascade], [only]):
     497.. method:: DB.truncate(table, [restart], [cascade], [only])
    500498
    501499    Empty a table or set of tables
     
    525523.. versionadded:: 4.2
    526524
     525get_as_list/dict -- read a table as a list or dictionary
     526--------------------------------------------------------
     527
     528.. method:: DB.get_as_list(table, [what], [where], [order], [limit], [offset], [scalar])
     529
     530    Get a table as a list
     531
     532    :param str table: the name of the table (the FROM clause)
     533    :param what: column(s) to be returned (the SELECT clause)
     534    :type what: str, list, tuple or None
     535    :param where: conditions(s) to be fulfilled (the WHERE clause)
     536    :type where: str, list, tuple or None
     537    :param order: column(s) to sort by (the ORDER BY clause)
     538    :type order: str, list, tuple, False or None
     539    :param int limit: maximum number of rows returned (the LIMIT clause)
     540    :param int offset: number of rows to be skipped (the OFFSET clause)
     541    :param bool scalar: whether only the first column shall be returned
     542    :returns: the content of the table as a list
     543    :rtype: list
     544    :raises TypeError: the table name has not been specified
     545
     546This gets a convenient representation of the table as a list of named tuples
     547in Python.  You only need to pass the name of the table (or any other SQL
     548expression returning rows).  Note that by default this will return the full
     549content of the table which can be huge and overflow your memory.  However, you
     550can control the amount of data returned using the other optional parameters.
     551
     552The parameter *what* can restrict the query to only return a subset of the
     553table columns.  The parameter *where* can restrict the query to only return a
     554subset of the table rows.  The specified SQL expressions all need to be
     555fulfilled for a row to get into the result.  The parameter *order* specifies
     556the ordering of the rows.  If no ordering is specified, the result will be
     557ordered by the primary key(s) or all columns if no primary key exists.
     558You can set *order* to *False* if you don't care about the ordering.
     559The parameters *limit* and *offset* specify the maximum number of rows
     560returned and a number of rows skipped over.
     561
     562If you set the *scalar* option to *True*, then instead of the named tuples
     563you will get the first items of these tuples.  This is useful if the result
     564has only one column anyway.
     565
     566.. method:: DB.get_as_dict(table, [keyname], [what], [where], [order], [limit], [offset], [scalar])
     567
     568    Get a table as a dictionary
     569
     570    :param str table: the name of the table (the FROM clause)
     571    :param keyname: column(s) to be used as key(s) of the dictionary
     572    :type keyname: str, list, tuple or None
     573    :param what: column(s) to be returned (the SELECT clause)
     574    :type what: str, list, tuple or None
     575    :param where: conditions(s) to be fulfilled (the WHERE clause)
     576    :type where: str, list, tuple or None
     577    :param order: column(s) to sort by (the ORDER BY clause)
     578    :type order: str, list, tuple, False or None
     579    :param int limit: maximum number of rows returned (the LIMIT clause)
     580    :param int offset: number of rows to be skipped (the OFFSET clause)
     581    :param bool scalar: whether only the first column shall be returned
     582    :returns: the content of the table as a list
     583    :rtype: dict or OrderedDict
     584    :raises TypeError: the table name has not been specified
     585    :raises KeyError: keyname(s) are invalid or not part of the result
     586    :raises pg.ProgrammingError: no keyname(s) and table has no primary key
     587
     588This method is similar to :meth:`DB.get_as_list`, but returns the table as
     589a Python dict instead of a Python list, which can be even more convenient.
     590The primary key column(s) of the table will be used as the keys of the
     591dictionary, while the other column(s) will be the corresponding values.
     592The keys will be named tuples if the table has a composite primary key.
     593The rows will be also named tuples unless the *scalar* option has been set
     594to *True*.  With the optional parameter *keyname* you can specify a different
     595set of columns to be used as the keys of the dictionary.
     596
     597If the Python version supports it, the dictionary will be an *OrderedDict*
     598using the order specified with the *order* parameter or the key column(s)
     599if not specified.  You can set *order* to *False* if you don't care about the
     600ordering.  In this case the returned dictionary will be an ordinary one.
     601
    527602escape_literal -- escape a literal string for use within SQL
    528603------------------------------------------------------------
  • trunk/pg.py

    r769 r770  
    3838from collections import namedtuple
    3939from functools import partial
     40from operator import itemgetter
     41
     42try:
     43    basestring
     44except NameError:  # Python >= 3.0
     45    basestring = (str, bytes)
     46
     47set_decimal(Decimal)
    4048
    4149try:
     
    4452    OrderedDict = dict
    4553
    46 try:
    47     basestring
    48 except NameError:  # Python >= 3.0
    49     basestring = (str, bytes)
    50 
    51 set_decimal(Decimal)
     54
     55    class AttrDict(dict):
     56        """Simple read-only ordered dictionary for storing attribute names."""
     57
     58        def __init__(self, *args, **kw):
     59            if len(args) > 1 or kw:
     60                raise TypeError
     61            items = args[0] if args else []
     62            if isinstance(items, dict):
     63                raise TypeError
     64            items = list(items)
     65            self._keys = [item[0] for item in items]
     66            dict.__init__(self, items)
     67            self._read_only = True
     68            error = self._read_only_error
     69            self.clear = self.update = error
     70            self.pop = self.setdefault = self.popitem = error
     71
     72        def __setitem__(self, key, value):
     73            if self._read_only:
     74                self._read_only_error()
     75            dict.__setitem__(self, key, value)
     76
     77        def __delitem__(self, key):
     78            if self._read_only:
     79                self._read_only_error()
     80            dict.__delitem__(self, key)
     81
     82        def __iter__(self):
     83            return iter(self._keys)
     84
     85        def keys(self):
     86            return list(self._keys)
     87
     88        def values(self):
     89            return [self[key] for key in self]
     90
     91        def items(self):
     92            return [(key, self[key]) for key in self]
     93
     94        def iterkeys(self):
     95            return self.__iter__()
     96
     97        def itervalues(self):
     98            return iter(self.values())
     99
     100        def iteritems(self):
     101            return iter(self.items())
     102
     103        @staticmethod
     104        def _read_only_error(*args, **kw):
     105            raise TypeError('This object is read-only')
     106
     107else:
     108
     109     class AttrDict(OrderedDict):
     110        """Simple read-only ordered dictionary for storing attribute names."""
     111
     112        def __init__(self, *args, **kw):
     113            self._read_only = False
     114            OrderedDict.__init__(self, *args, **kw)
     115            self._read_only = True
     116            error = self._read_only_error
     117            self.clear = self.update = error
     118            self.pop = self.setdefault = self.popitem = error
     119
     120        def __setitem__(self, key, value):
     121            if self._read_only:
     122                self._read_only_error()
     123            OrderedDict.__setitem__(self, key, value)
     124
     125        def __delitem__(self, key):
     126            if self._read_only:
     127                self._read_only_error()
     128            OrderedDict.__delitem__(self, key)
     129
     130        @staticmethod
     131        def _read_only_error(*args, **kw):
     132            raise TypeError('This object is read-only')
     133
    52134
    53135
     
    84166
    85167set_namedresult(_namedresult)
     168
     169
     170class _MemoryQuery:
     171    """Class that embodies a given query result."""
     172
     173    def __init__(self, result, fields):
     174        """Create query from given result rows and field names."""
     175        self.result = result
     176        self.fields = fields
     177
     178    def listfields(self):
     179        """Return the stored field names of this query."""
     180        return self.fields
     181
     182    def getresult(self):
     183        """Return the stored result of this query."""
     184        return self.result
    86185
    87186
     
    704803        """Given the name of a table, dig out the set of attribute names.
    705804
    706         Returns a dictionary of attribute names (the names are the keys,
    707         the values are the names of the attributes' types).
    708 
    709         If your Python version supports this, the dictionary will be an
    710         OrderedDictionary with the column names in the right order.
    711 
    712         If flush is set then the internal cache for attribute names will
     805        Returns a read-only dictionary of attribute names (the names are
     806        the keys, the values are the names of the attributes' types)
     807        with the column names in the proper order if you iterate over it.
     808
     809        If flush is set, then the internal cache for attribute names will
    713810        be flushed. This may be necessary after the database schema or
    714811        the search path has been changed.
     
    735832            if not self._regtypes:
    736833                names = ((name, _simpletype(typ)) for name, typ in names)
    737             names = OrderedDict(names)
     834            names = AttrDict(names)
    738835            attnames[table] = names  # cache it
    739836        return names
     
    11921289        q = ' '.join(q)
    11931290        self._do_debug(q)
    1194         return self.query(q)
     1291        return self.db.query(q)
     1292
     1293    def get_as_list(self, table, what=None, where=None,
     1294            order=None, limit=None, offset=None, scalar=False):
     1295        """Get a table as a list.
     1296
     1297        This gets a convenient representation of the table as a list
     1298        of named tuples in Python.  You only need to pass the name of
     1299        the table (or any other SQL expression returning rows).  Note that
     1300        by default this will return the full content of the table which
     1301        can be huge and overflow your memory.  However, you can control
     1302        the amount of data returned using the other optional parameters.
     1303
     1304        The parameter 'what' can restrict the query to only return a
     1305        subset of the table columns.  It can be a string, list or a tuple.
     1306        The parameter 'where' can restrict the query to only return a
     1307        subset of the table rows.  It can be a string, list or a tuple
     1308        of SQL expressions that all need to be fulfilled.  The parameter
     1309        'order' specifies the ordering of the rows.  It can also be a
     1310        other string, list or a tuple.  If no ordering is specified,
     1311        the result will be ordered by the primary key(s) or all columns
     1312        if no primary key exists.  You can set 'order' to False if you
     1313        don't care about the ordering.  The parameters 'limit' and 'offset'
     1314        can be integers specifying the maximum number of rows returned
     1315        and a number of rows skipped over.
     1316
     1317        If you set the 'scalar' option to True, then instead of the
     1318        named tuples you will get the first items of these tuples.
     1319        This is useful if the result has only one column anyway.
     1320        """
     1321        if not table:
     1322            raise TypeError('The table name is missing')
     1323        if what:
     1324            if isinstance(what, (list, tuple)):
     1325                what = ', '.join(map(str, what))
     1326            if order is None:
     1327                order = what
     1328        else:
     1329            what = '*'
     1330        q = ['SELECT', what, 'FROM', table]
     1331        if where:
     1332            if isinstance(where, (list, tuple)):
     1333                where = ' AND '.join(map(str, where))
     1334            q.extend(['WHERE', where])
     1335        if order is None:
     1336            try:
     1337                order = self.pkey(table, True)
     1338            except (KeyError, ProgrammingError):
     1339                try:
     1340                    order = list(self.get_attnames(table))
     1341                except (KeyError, ProgrammingError):
     1342                    pass
     1343        if order:
     1344            if isinstance(order, (list, tuple)):
     1345                order = ', '.join(map(str, order))
     1346            q.extend(['ORDER BY', order])
     1347        if limit:
     1348            q.append('LIMIT %d' % limit)
     1349        if offset:
     1350            q.append('OFFSET %d' % offset)
     1351        q = ' '.join(q)
     1352        self._do_debug(q)
     1353        q = self.db.query(q)
     1354        res = q.namedresult()
     1355        if res and scalar:
     1356            res = [row[0] for row in res]
     1357        return res
     1358
     1359    def get_as_dict(self, table, keyname=None, what=None, where=None,
     1360            order=None, limit=None, offset=None, scalar=False):
     1361        """Get a table as a dictionary.
     1362
     1363        This method is similar to get_as_list(), but returns the table
     1364        as a Python dict instead of a Python list, which can be even
     1365        more convenient. The primary key column(s) of the table will
     1366        be used as the keys of the dictionary, while the other column(s)
     1367        will be the corresponding values.  The keys will be named tuples
     1368        if the table has a composite primary key.  The rows will be also
     1369        named tuples unless the 'scalar' option has been set to True.
     1370        With the optional parameter 'keyname' you can specify an alternative
     1371        set of columns to be used as the keys of the dictionary.  It must
     1372        be set as a string, list or a tuple.
     1373
     1374        If the Python version supports it, the dictionary will be an
     1375        OrderedDict using the order specified with the 'order' parameter
     1376        or the key column(s) if not specified.  You can set 'order' to False
     1377        if you don't care about the ordering.  In this case the returned
     1378        dictionary will be an ordinary one.
     1379        """
     1380        if not table:
     1381            raise TypeError('The table name is missing')
     1382        if not keyname:
     1383            try:
     1384                keyname = self.pkey(table, True)
     1385            except (KeyError, ProgrammingError):
     1386                raise _prg_error('Table %s has no primary key' % table)
     1387        if isinstance(keyname, basestring):
     1388            keyname = [keyname]
     1389        elif not isinstance(keyname, (list, tuple)):
     1390            raise KeyError('The keyname must be a string, list or tuple')
     1391        if what:
     1392            if isinstance(what, (list, tuple)):
     1393                what = ', '.join(map(str, what))
     1394            if order is None:
     1395                order = what
     1396        else:
     1397            what = '*'
     1398        q = ['SELECT', what, 'FROM', table]
     1399        if where:
     1400            if isinstance(where, (list, tuple)):
     1401                where = ' AND '.join(map(str, where))
     1402            q.extend(['WHERE', where])
     1403        if order is None:
     1404            order = keyname
     1405        if order:
     1406            if isinstance(order, (list, tuple)):
     1407                order = ', '.join(map(str, order))
     1408            q.extend(['ORDER BY', order])
     1409        if limit:
     1410            q.append('LIMIT %d' % limit)
     1411        if offset:
     1412            q.append('OFFSET %d' % offset)
     1413        q = ' '.join(q)
     1414        self._do_debug(q)
     1415        q = self.db.query(q)
     1416        res = q.getresult()
     1417        cls = OrderedDict if order else dict
     1418        if not res:
     1419            return cls()
     1420        keyset = set(keyname)
     1421        fields = q.listfields()
     1422        if not keyset.issubset(fields):
     1423            raise KeyError('Missing keyname in row')
     1424        keyind, rowind = [], []
     1425        for i, f in enumerate(fields):
     1426            (keyind if f in keyset else rowind).append(i)
     1427        keytuple = len(keyind) > 1
     1428        getkey = itemgetter(*keyind)
     1429        keys = map(getkey, res)
     1430        if scalar:
     1431            rowind = rowind[:1]
     1432            rowtuple = False
     1433        else:
     1434            rowtuple = len(rowind) > 1
     1435        if scalar or rowtuple:
     1436            getrow = itemgetter(*rowind)
     1437        else:
     1438            rowind = rowind[0]
     1439            getrow = lambda row: (row[rowind],)
     1440            rowtuple = True
     1441        rows = map(getrow, res)
     1442        if keytuple or rowtuple:
     1443            namedresult = get_namedresult()
     1444            if keytuple:
     1445                keys = namedresult(_MemoryQuery(keys, keyname))
     1446            if rowtuple:
     1447                fields = [f for f in fields if f not in keyset]
     1448                rows = namedresult(_MemoryQuery(rows, fields))
     1449        return cls(zip(keys, rows))
    11951450
    11961451    def notification_handler(self,
  • trunk/tests/__init__.py

    r644 r770  
    22
    33You can specify your local database settings in LOCAL_PyGreSQL.py.
    4 
    54"""
    65
     
    109    import unittest
    1110
     11if not (hasattr(unittest, 'skip')
     12        and hasattr(unittest.TestCase, 'setUpClass')
     13        and hasattr(unittest.TestCase, 'skipTest')
     14        and hasattr(unittest.TestCase, 'assertIn')):
     15    raise ImportError('Please install a newer version of unittest')
     16
    1217
    1318def discover():
  • trunk/tests/test_classic_connection.py

    r744 r770  
    99
    1010These tests need a database to test against.
    11 
    1211"""
    1312
     
    871870    """Test inserttable method."""
    872871
     872    cls_set_up = False
     873
    873874    @classmethod
    874875    def setUpClass(cls):
     
    885886            "select length('À') - length('a')").getresult()[0][0] == 0
    886887        c.close()
     888        cls.cls_set_up = True
    887889
    888890    @classmethod
     
    893895
    894896    def setUp(self):
     897        self.assertTrue(self.cls_set_up)
    895898        self.c = connect()
    896899        self.c.query("set client_encoding=utf8")
     
    11021105    """Test copy command with direct socket access."""
    11031106
     1107    cls_set_up = False
     1108
    11041109    @classmethod
    11051110    def setUpClass(cls):
     
    11081113        c.query("create table test (i int, v varchar(16))")
    11091114        c.close()
     1115        cls.cls_set_up = True
    11101116
    11111117    @classmethod
     
    11161122
    11171123    def setUp(self):
     1124        self.assertTrue(self.cls_set_up)
    11181125        self.c = connect()
    11191126        self.c.query("set client_encoding=utf8")
     
    12831290    To test the effect of most of these functions, we need a database
    12841291    connection.  That's why they are covered in this test module.
    1285 
    12861292    """
    12871293
     
    16061612    in order to ensure that the tests always run under the same conditions.
    16071613    That's why these tests are included in this test module.
    1608 
    16091614    """
     1615
     1616    cls_set_up = False
    16101617
    16111618    @classmethod
     
    16151622        query('set standard_conforming_strings=off')
    16161623        query('set bytea_output=escape')
     1624        cls.cls_set_up = True
    16171625
    16181626    def testEscapeString(self):
     1627        self.assertTrue(self.cls_set_up)
    16191628        f = pg.escape_string
    16201629        r = f(b'plain')
     
    16341643
    16351644    def testEscapeBytea(self):
     1645        self.assertTrue(self.cls_set_up)
    16361646        f = pg.escape_bytea
    16371647        r = f(b'plain')
  • trunk/tests/test_classic_dbwrapper.py

    r769 r770  
    99
    1010These tests need a database to test against.
    11 
    1211"""
     12
    1313try:
    1414    import unittest2 as unittest  # for Python < 2.7
     
    2323
    2424from decimal import Decimal
     25from operator import itemgetter
    2526
    2627# We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
     
    7677    db.query("set client_min_messages=warning")
    7778    return db
     79
     80
     81class TestAttrDict(unittest.TestCase):
     82    """Test the simple ordered dictionary for attribute names."""
     83
     84    cls = pg.AttrDict
     85    base = OrderedDict
     86
     87    def testInit(self):
     88        a = self.cls()
     89        self.assertIsInstance(a, self.base)
     90        self.assertEqual(a, self.base())
     91        items = [('id', 'int'), ('name', 'text')]
     92        a = self.cls(items)
     93        self.assertIsInstance(a, self.base)
     94        self.assertEqual(a, self.base(items))
     95        iteritems = iter(items)
     96        a = self.cls(iteritems)
     97        self.assertIsInstance(a, self.base)
     98        self.assertEqual(a, self.base(items))
     99
     100    def testIter(self):
     101        a = self.cls()
     102        self.assertEqual(list(a), [])
     103        keys = ['id', 'name', 'age']
     104        items = [(key, None) for key in keys]
     105        a = self.cls(items)
     106        self.assertEqual(list(a), keys)
     107
     108    def testKeys(self):
     109        a = self.cls()
     110        self.assertEqual(list(a.keys()), [])
     111        keys = ['id', 'name', 'age']
     112        items = [(key, None) for key in keys]
     113        a = self.cls(items)
     114        self.assertEqual(list(a.keys()), keys)
     115
     116    def testValues(self):
     117        a = self.cls()
     118        self.assertEqual(list(a.values()), [])
     119        items = [('id', 'int'), ('name', 'text')]
     120        values = [item[1] for item in items]
     121        a = self.cls(items)
     122        self.assertEqual(list(a.values()), values)
     123
     124    def testItems(self):
     125        a = self.cls()
     126        self.assertEqual(list(a.items()), [])
     127        items = [('id', 'int'), ('name', 'text')]
     128        a = self.cls(items)
     129        self.assertEqual(list(a.items()), items)
     130
     131    def testGet(self):
     132        a = self.cls([('id', 1)])
     133        try:
     134            self.assertEqual(a['id'], 1)
     135        except KeyError:
     136            self.fail('AttrDict should be readable')
     137
     138    def testSet(self):
     139        a = self.cls()
     140        try:
     141            a['id'] = 1
     142        except TypeError:
     143            pass
     144        else:
     145            self.fail('AttrDict should be read-only')
     146
     147    def testDel(self):
     148        a = self.cls([('id', 1)])
     149        try:
     150            del a['id']
     151        except TypeError:
     152            pass
     153        else:
     154            self.fail('AttrDict should be read-only')
     155
     156    def testWriteMethods(self):
     157        a = self.cls([('id', 1)])
     158        self.assertEqual(a['id'], 1)
     159        for method in 'clear', 'update', 'pop', 'setdefault', 'popitem':
     160            method = getattr(a, method)
     161            self.assertRaises(TypeError, method, a)
    78162
    79163
     
    100184            'escape_literal', 'escape_string',
    101185            'fileno',
    102             'get', 'get_attnames', 'get_databases',
     186            'get', 'get_as_dict', 'get_as_list',
     187            'get_attnames', 'get_databases',
    103188            'get_notice_receiver', 'get_parameter',
    104189            'get_relations', 'get_tables',
     
    284369    """Test the methods of the DB class wrapped pg connection."""
    285370
     371    cls_set_up = False
     372
    286373    @classmethod
    287374    def setUpClass(cls):
     
    295382            " select i4, v4 from test")
    296383        db.close()
     384        cls.cls_set_up = True
    297385
    298386    @classmethod
     
    303391
    304392    def setUp(self):
     393        self.assertTrue(self.cls_set_up)
    305394        self.db = DB()
    306395        query = self.db.query
     
    9381027    def testGetAttnamesIsOrdered(self):
    9391028        get_attnames = self.db.get_attnames
    940         query = self.db.query
    941         self.createTable('test_table',
     1029        r = get_attnames('test', flush=True)
     1030        self.assertIsInstance(r, OrderedDict)
     1031        self.assertEqual(r, OrderedDict([
     1032            ('i2', 'int'), ('i4', 'int'), ('i8', 'int'),
     1033            ('d', 'num'), ('f4', 'float'), ('f8', 'float'), ('m', 'money'),
     1034            ('v4', 'text'), ('c4', 'text'), ('t', 'text')]))
     1035        if OrderedDict is not dict:
     1036            r = ' '.join(list(r.keys()))
     1037            self.assertEqual(r, 'i2 i4 i8 d f4 f8 m v4 c4 t')
     1038        table = 'test table for get_attnames'
     1039        self.createTable(table,
    9421040            ' n int, alpha smallint, v varchar(3),'
    9431041            ' gamma char(5), tau text, beta bool')
    944         r = get_attnames("test_table")
     1042        r = get_attnames(table)
    9451043        self.assertIsInstance(r, OrderedDict)
    9461044        self.assertEqual(r, OrderedDict([
    9471045            ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
    9481046            ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
    949         if OrderedDict is dict:
     1047        if OrderedDict is not dict:
     1048            r = ' '.join(list(r.keys()))
     1049            self.assertEqual(r, 'n alpha v gamma tau beta')
     1050        else:
    9501051            self.skipTest('OrderedDict is not supported')
     1052
     1053    def testGetAttnamesIsAttrDict(self):
     1054        AttrDict = pg.AttrDict
     1055        get_attnames = self.db.get_attnames
     1056        r = get_attnames('test', flush=True)
     1057        self.assertIsInstance(r, AttrDict)
     1058        self.assertEqual(r, AttrDict([
     1059            ('i2', 'int'), ('i4', 'int'), ('i8', 'int'),
     1060            ('d', 'num'), ('f4', 'float'), ('f8', 'float'), ('m', 'money'),
     1061            ('v4', 'text'), ('c4', 'text'), ('t', 'text')]))
     1062        r = ' '.join(list(r.keys()))
     1063        self.assertEqual(r, 'i2 i4 i8 d f4 f8 m v4 c4 t')
     1064        table = 'test table for get_attnames'
     1065        self.createTable(table,
     1066            ' n int, alpha smallint, v varchar(3),'
     1067            ' gamma char(5), tau text, beta bool')
     1068        r = get_attnames(table)
     1069        self.assertIsInstance(r, AttrDict)
     1070        self.assertEqual(r, AttrDict([
     1071            ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
     1072            ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
    9511073        r = ' '.join(list(r.keys()))
    9521074        self.assertEqual(r, 'n alpha v gamma tau beta')
     
    18291951    def testClearWithQuotedNames(self):
    18301952        clear = self.db.clear
    1831         query = self.db.query
    18321953        table = 'test table for clear()'
    18331954        self.createTable(table, '"Prime!" smallint primary key,'
     
    22222343        self.assertEqual(r, 0)
    22232344
     2345    def testGetAsList(self):
     2346        get_as_list = self.db.get_as_list
     2347        self.assertRaises(TypeError, get_as_list)
     2348        self.assertRaises(TypeError, get_as_list, None)
     2349        query = self.db.query
     2350        table = 'test_aslist'
     2351        r = query('select 1 as colname').namedresult()[0]
     2352        self.assertIsInstance(r, tuple)
     2353        named = hasattr(r, 'colname')
     2354        names = [(1, 'Homer'), (2, 'Marge'),
     2355                (3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
     2356        self.createTable(table,
     2357            'id smallint primary key, name varchar', values=names)
     2358        r = get_as_list(table)
     2359        self.assertIsInstance(r, list)
     2360        self.assertEqual(r, names)
     2361        for t, n in zip(r, names):
     2362            self.assertIsInstance(t, tuple)
     2363            self.assertEqual(t, n)
     2364            if named:
     2365                self.assertEqual(t.id, n[0])
     2366                self.assertEqual(t.name, n[1])
     2367                self.assertEqual(t._asdict(), dict(id=n[0], name=n[1]))
     2368        r = get_as_list(table, what='name')
     2369        self.assertIsInstance(r, list)
     2370        expected = sorted((row[1],) for row in names)
     2371        self.assertEqual(r, expected)
     2372        r = get_as_list(table, what='name, id')
     2373        self.assertIsInstance(r, list)
     2374        expected = sorted(tuple(reversed(row)) for row in names)
     2375        self.assertEqual(r, expected)
     2376        r = get_as_list(table, what=['name', 'id'])
     2377        self.assertIsInstance(r, list)
     2378        self.assertEqual(r, expected)
     2379        r = get_as_list(table, where="name like 'Ba%'")
     2380        self.assertIsInstance(r, list)
     2381        self.assertEqual(r, names[2:3])
     2382        r = get_as_list(table, what='name', where="name like 'Ma%'")
     2383        self.assertIsInstance(r, list)
     2384        self.assertEqual(r, [('Maggie',), ('Marge',)])
     2385        r = get_as_list(table, what='name',
     2386                        where=["name like 'Ma%'", "name like '%r%'"])
     2387        self.assertIsInstance(r, list)
     2388        self.assertEqual(r, [('Marge',)])
     2389        r = get_as_list(table, what='name', order='id')
     2390        self.assertIsInstance(r, list)
     2391        expected = [(row[1],) for row in names]
     2392        self.assertEqual(r, expected)
     2393        r = get_as_list(table, what=['name'], order=['id'])
     2394        self.assertIsInstance(r, list)
     2395        self.assertEqual(r, expected)
     2396        r = get_as_list(table, what=['id', 'name'], order=['id', 'name'])
     2397        self.assertIsInstance(r, list)
     2398        self.assertEqual(r, names)
     2399        r = get_as_list(table, what='id * 2 as num', order='id desc')
     2400        self.assertIsInstance(r, list)
     2401        expected = [(n,) for n in range(10, 0, -2)]
     2402        self.assertEqual(r, expected)
     2403        r = get_as_list(table, limit=2)
     2404        self.assertIsInstance(r, list)
     2405        self.assertEqual(r, names[:2])
     2406        r = get_as_list(table, offset=3)
     2407        self.assertIsInstance(r, list)
     2408        self.assertEqual(r, names[3:])
     2409        r = get_as_list(table, limit=1, offset=2)
     2410        self.assertIsInstance(r, list)
     2411        self.assertEqual(r, names[2:3])
     2412        r = get_as_list(table, scalar=True)
     2413        self.assertIsInstance(r, list)
     2414        self.assertEqual(r, list(range(1, 6)))
     2415        r = get_as_list(table, what='name', scalar=True)
     2416        self.assertIsInstance(r, list)
     2417        expected = sorted(row[1] for row in names)
     2418        self.assertEqual(r, expected)
     2419        r = get_as_list(table, what='name', limit=1, scalar=True)
     2420        self.assertIsInstance(r, list)
     2421        self.assertEqual(r, expected[:1])
     2422        query('alter table "%s" drop constraint "%s_pkey"' % (table, table))
     2423        self.assertRaises(KeyError, self.db.pkey, table, flush=True)
     2424        names.insert(1, (1, 'Snowball'))
     2425        query('insert into "%s" values ($1, $2)' % table, (1, 'Snowball'))
     2426        r = get_as_list(table)
     2427        self.assertIsInstance(r, list)
     2428        self.assertEqual(r, names)
     2429        r = get_as_list(table, what='name', where='id=1', scalar=True)
     2430        self.assertIsInstance(r, list)
     2431        self.assertEqual(r, ['Homer', 'Snowball'])
     2432        # test with unordered query
     2433        r = get_as_list(table, order=False)
     2434        self.assertIsInstance(r, list)
     2435        self.assertEqual(set(r), set(names))
     2436        # test with arbitrary from clause
     2437        from_table = '(select lower(name) as n2 from "%s") as t2' % table
     2438        r = get_as_list(from_table)
     2439        self.assertIsInstance(r, list)
     2440        r = set(row[0] for row in r)
     2441        expected = set(row[1].lower() for row in names)
     2442        self.assertEqual(r, expected)
     2443        r = get_as_list(from_table, order='n2', scalar=True)
     2444        self.assertIsInstance(r, list)
     2445        self.assertEqual(r, sorted(expected))
     2446        r = get_as_list(from_table, order='n2', limit=1)
     2447        self.assertIsInstance(r, list)
     2448        self.assertEqual(len(r), 1)
     2449        t = r[0]
     2450        self.assertIsInstance(t, tuple)
     2451        if named:
     2452            self.assertEqual(t.n2, 'bart')
     2453            self.assertEqual(t._asdict(), dict(n2='bart'))
     2454        else:
     2455            self.assertEqual(t, ('bart',))
     2456
     2457    def testGetAsDict(self):
     2458        get_as_dict = self.db.get_as_dict
     2459        self.assertRaises(TypeError, get_as_dict)
     2460        self.assertRaises(TypeError, get_as_dict, None)
     2461        # the test table has no primary key
     2462        self.assertRaises(pg.ProgrammingError, get_as_dict, 'test')
     2463        query = self.db.query
     2464        table = 'test_asdict'
     2465        r = query('select 1 as colname').namedresult()[0]
     2466        self.assertIsInstance(r, tuple)
     2467        named = hasattr(r, 'colname')
     2468        colors = [(1, '#7cb9e8', 'Aero'), (2, '#b5a642', 'Brass'),
     2469                  (3, '#b2ffff', 'Celeste'), (4, '#c19a6b', 'Desert')]
     2470        self.createTable(table,
     2471            'id smallint primary key, rgb char(7), name varchar',
     2472            values=colors)
     2473        # keyname must be string, list or tuple
     2474        self.assertRaises(KeyError, get_as_dict, table, 3)
     2475        self.assertRaises(KeyError, get_as_dict, table, dict(id=None))
     2476        # missing keyname in row
     2477        self.assertRaises(KeyError, get_as_dict, table,
     2478                          keyname='rgb', what='name')
     2479        r = get_as_dict(table)
     2480        self.assertIsInstance(r, OrderedDict)
     2481        expected = OrderedDict((row[0], row[1:]) for row in colors)
     2482        self.assertEqual(r, expected)
     2483        for key in r:
     2484            self.assertIsInstance(key, int)
     2485            self.assertIn(key, expected)
     2486            row = r[key]
     2487            self.assertIsInstance(row, tuple)
     2488            t = expected[key]
     2489            self.assertEqual(row, t)
     2490            if named:
     2491                self.assertEqual(row.rgb, t[0])
     2492                self.assertEqual(row.name, t[1])
     2493                self.assertEqual(row._asdict(), dict(rgb=t[0], name=t[1]))
     2494        if OrderedDict is not dict:  # Python > 2.6
     2495            self.assertEqual(r.keys(), expected.keys())
     2496        r = get_as_dict(table, keyname='rgb')
     2497        self.assertIsInstance(r, OrderedDict)
     2498        expected = OrderedDict((row[1], (row[0], row[2]))
     2499            for row in sorted(colors, key=itemgetter(1)))
     2500        self.assertEqual(r, expected)
     2501        for key in r:
     2502            self.assertIsInstance(key, str)
     2503            self.assertIn(key, expected)
     2504            row = r[key]
     2505            self.assertIsInstance(row, tuple)
     2506            t = expected[key]
     2507            self.assertEqual(row, t)
     2508            if named:
     2509                self.assertEqual(row.id, t[0])
     2510                self.assertEqual(row.name, t[1])
     2511                self.assertEqual(row._asdict(), dict(id=t[0], name=t[1]))
     2512        if OrderedDict is not dict:  # Python > 2.6
     2513            self.assertEqual(r.keys(), expected.keys())
     2514        r = get_as_dict(table, keyname=['id', 'rgb'])
     2515        self.assertIsInstance(r, OrderedDict)
     2516        expected = OrderedDict((row[:2], row[2:]) for row in colors)
     2517        self.assertEqual(r, expected)
     2518        for key in r:
     2519            self.assertIsInstance(key, tuple)
     2520            self.assertIsInstance(key[0], int)
     2521            self.assertIsInstance(key[1], str)
     2522            if named:
     2523                self.assertEqual(key, (key.id, key.rgb))
     2524                self.assertEqual(key._fields, ('id', 'rgb'))
     2525            row = r[key]
     2526            self.assertIsInstance(row, tuple)
     2527            self.assertIsInstance(row[0], str)
     2528            t = expected[key]
     2529            self.assertEqual(row, t)
     2530            if named:
     2531                self.assertEqual(row.name, t[0])
     2532                self.assertEqual(row._asdict(), dict(name=t[0]))
     2533        if OrderedDict is not dict:  # Python > 2.6
     2534            self.assertEqual(r.keys(), expected.keys())
     2535        r = get_as_dict(table, keyname=['id', 'rgb'], scalar=True)
     2536        self.assertIsInstance(r, OrderedDict)
     2537        expected = OrderedDict((row[:2], row[2]) for row in colors)
     2538        self.assertEqual(r, expected)
     2539        for key in r:
     2540            self.assertIsInstance(key, tuple)
     2541            row = r[key]
     2542            self.assertIsInstance(row, str)
     2543            t = expected[key]
     2544            self.assertEqual(row, t)
     2545        if OrderedDict is not dict:  # Python > 2.6
     2546            self.assertEqual(r.keys(), expected.keys())
     2547        r = get_as_dict(table, keyname='rgb', what=['rgb', 'name'], scalar=True)
     2548        self.assertIsInstance(r, OrderedDict)
     2549        expected = OrderedDict((row[1], row[2])
     2550            for row in sorted(colors, key=itemgetter(1)))
     2551        self.assertEqual(r, expected)
     2552        for key in r:
     2553            self.assertIsInstance(key, str)
     2554            row = r[key]
     2555            self.assertIsInstance(row, str)
     2556            t = expected[key]
     2557            self.assertEqual(row, t)
     2558        if OrderedDict is not dict:  # Python > 2.6
     2559            self.assertEqual(r.keys(), expected.keys())
     2560        r = get_as_dict(table, what='id, name',
     2561                        where="rgb like '#b%'", scalar=True)
     2562        self.assertIsInstance(r, OrderedDict)
     2563        expected = OrderedDict((row[0], row[2]) for row in colors[1:3])
     2564        self.assertEqual(r, expected)
     2565        for key in r:
     2566            self.assertIsInstance(key, int)
     2567            row = r[key]
     2568            self.assertIsInstance(row, str)
     2569            t = expected[key]
     2570            self.assertEqual(row, t)
     2571        if OrderedDict is not dict:  # Python > 2.6
     2572            self.assertEqual(r.keys(), expected.keys())
     2573        expected = r
     2574        r = get_as_dict(table, what=['name', 'id'],
     2575                        where=['id > 1', 'id < 4', "rgb like '#b%'",
     2576                   "name not like 'A%'", "name not like '%t'"], scalar=True)
     2577        self.assertEqual(r, expected)
     2578        r = get_as_dict(table, what='name, id', limit=2, offset=1, scalar=True)
     2579        self.assertEqual(r, expected)
     2580        r = get_as_dict(table, keyname=('id',), what=('name', 'id'),
     2581                        where=('id > 1', 'id < 4'), order=('id',), scalar=True)
     2582        self.assertEqual(r, expected)
     2583        r = get_as_dict(table, limit=1)
     2584        self.assertEqual(len(r), 1)
     2585        self.assertEqual(r[1][1], 'Aero')
     2586        r = get_as_dict(table, offset=3)
     2587        self.assertEqual(len(r), 1)
     2588        self.assertEqual(r[4][1], 'Desert')
     2589        r = get_as_dict(table, order='id desc')
     2590        expected = OrderedDict((row[0], row[1:]) for row in reversed(colors))
     2591        self.assertEqual(r, expected)
     2592        r = get_as_dict(table, where='id > 5')
     2593        self.assertIsInstance(r, OrderedDict)
     2594        self.assertEqual(len(r), 0)
     2595        # test with unordered query
     2596        expected = dict((row[0], row[1:]) for row in colors)
     2597        r = get_as_dict(table, order=False)
     2598        self.assertIsInstance(r, dict)
     2599        self.assertEqual(r, expected)
     2600        if dict is not OrderedDict:  # Python > 2.6
     2601            self.assertNotIsInstance(self, OrderedDict)
     2602        # test with arbitrary from clause
     2603        from_table = '(select id, lower(name) as n2 from "%s") as t2' % table
     2604        # primary key must be passed explicitly in this case
     2605        self.assertRaises(pg.ProgrammingError, get_as_dict, from_table)
     2606        r = get_as_dict(from_table, 'id')
     2607        self.assertIsInstance(r, OrderedDict)
     2608        expected = OrderedDict((row[0], (row[2].lower(),)) for row in colors)
     2609        self.assertEqual(r, expected)
     2610        # test without a primary key
     2611        query('alter table "%s" drop constraint "%s_pkey"' % (table, table))
     2612        self.assertRaises(KeyError, self.db.pkey, table, flush=True)
     2613        self.assertRaises(pg.ProgrammingError, get_as_dict, table)
     2614        r = get_as_dict(table, keyname='id')
     2615        expected = OrderedDict((row[0], row[1:]) for row in colors)
     2616        self.assertIsInstance(r, dict)
     2617        self.assertEqual(r, expected)
     2618        r = (1, '#007fff', 'Azure')
     2619        query('insert into "%s" values ($1, $2, $3)' % table, r)
     2620        # the last entry will win
     2621        expected[1] = r[1:]
     2622        r = get_as_dict(table, keyname='id')
     2623        self.assertEqual(r, expected)
     2624
    22242625    def testTransaction(self):
    22252626        query = self.db.query
     
    25062907    """Test correct handling of schemas (namespaces)."""
    25072908
     2909    cls_set_up = False
     2910
    25082911    @classmethod
    25092912    def setUpClass(cls):
     
    25292932                  % (schema, num_schema, num_schema))
    25302933        db.close()
     2934        cls.cls_set_up = True
    25312935
    25322936    @classmethod
     
    25452949
    25462950    def setUp(self):
     2951        self.assertTrue(self.cls_set_up)
    25472952        self.db = DB()
    25482953
  • trunk/tests/test_classic_functions.py

    r729 r770  
    99
    1010These tests do not need a database to test against.
    11 
    1211"""
    13 
    1412
    1513try:
  • trunk/tests/test_classic_largeobj.py

    r648 r770  
    99
    1010These tests need a database to test against.
    11 
    1211"""
    1312
  • trunk/tests/test_dbapi20_copy.py

    r697 r770  
    99
    1010These tests need a database to test against.
    11 
    1211"""
    1312
     
    124123class TestCopy(unittest.TestCase):
    125124
     125    cls_set_up = False
     126
    126127    @staticmethod
    127128    def connect():
     
    140141        con.commit()
    141142        con.close()
     143        cls.cls_set_up = True
    142144
    143145    @classmethod
     
    151153
    152154    def setUp(self):
     155        self.assertTrue(self.cls_set_up)
    153156        self.con = self.connect()
    154157        self.cursor = self.con.cursor()
Note: See TracChangeset for help on using the changeset viewer.