Changeset 739 for trunk


Ignore:
Timestamp:
Jan 13, 2016, 7:30:50 PM (4 years ago)
Author:
cito
Message:

Return ordered dict for attributes is possible

Sometimes it's important to know the order of the columns in a table.
By returning an OrderedDict? instead of a dict in get_attnames, we can
deliver that information en passant, while staying backward compatible.

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/pg/db_wrapper.rst

    r738 r739  
    125125
    126126Given the name of a table, digs out the set of attribute names.
     127
     128Returns a dictionary of attribute names (the names are the keys,
     129the values are the names of the attributes' types).
     130
     131If your Python version supports this, the dictionary will be an
     132OrderedDictionary with the column names in the right order.
     133
     134By default, only a limited number of simple types will be returned.
     135You can get the regular types after enabling this by calling the
     136:meth:`DB.use_regtypes` method.
     137
    127138
    128139has_table_privilege -- check table privilege
  • trunk/pg.py

    r738 r739  
    3939from collections import namedtuple
    4040from functools import partial
     41
     42try:
     43    from collections import OrderedDict
     44except ImportError:  # Python 2.6 or 3.0
     45    OrderedDict = dict
    4146
    4247try:
     
    554559        the values are the names of the attributes' types).
    555560
    556         If the optional newattnames exists, it must be a dictionary and
    557         will become the new attribute names dictionary.
     561        If your Python version supports this, the dictionary will be an
     562        OrderedDictionary with the column names in the right order.
     563
     564        If flush is set then the internal cache for attribute names will
     565        be flushed. This may be necessary after the database schema or
     566        the search path has been changed.
    558567
    559568        By default, only a limited number of simple types will be returned.
     
    573582                " WHERE a.attrelid = %s::regclass"
    574583                " AND (a.attnum > 0 OR a.attname = 'oid')"
    575                 " AND NOT a.attisdropped") % (
     584                " AND NOT a.attisdropped ORDER BY a.attnum") % (
    576585                    '::regtype' if self._regtypes else '',
    577586                    self._prepare_qualified_param(table, 1))
     
    579588            if not names:
    580589                raise KeyError('Table %s does not exist' % table)
    581             if self._regtypes:
    582                 names = dict(names)
    583             else:
    584                 names = dict((name, _simpletype(typ)) for name, typ in names)
     590            if not self._regtypes:
     591                names = ((name, _simpletype(typ)) for name, typ in names)
     592            names = OrderedDict(names)
    585593            attnames[table] = names  # cache it
    586594        return names
  • trunk/tests/test_classic_dbwrapper.py

    r735 r739  
    4848except NameError:  # Python >= 3.0
    4949    unicode = str
     50
     51try:
     52    from collections import OrderedDict
     53except ImportError:  # Python 2.6 or 3.0
     54    OrderedDict = dict
    5055
    5156windows = os.name == 'nt'
     
    407412        self.assertEqual(f(r'\\x4f007073ff21'), b'\\x4f007073ff21')
    408413
     414    def testGetAttnames(self):
     415        get_attnames = self.db.get_attnames
     416        query = self.db.query
     417        query("drop table if exists test_table")
     418        query("create table test_table("
     419            " n int, alpha smallint, beta bool,"
     420            " gamma char(5), tau text, v varchar(3))")
     421        r = get_attnames("test_table")
     422        self.assertIsInstance(r, dict)
     423        self.assertEquals(r, dict(
     424            n='int', alpha='int', beta='bool',
     425            gamma='text', tau='text', v='text'))
     426        query("drop table test_table")
     427
     428    def testGetAttnamesWithQuotes(self):
     429        get_attnames = self.db.get_attnames
     430        query = self.db.query
     431        table = 'test table for get_attnames()'
     432        query('drop table if exists "%s"' % table)
     433        query('create table "%s"('
     434            '"Prime!" smallint,'
     435            '"much space" integer, "Questions?" text)' % table)
     436        r = get_attnames(table)
     437        self.assertIsInstance(r, dict)
     438        self.assertEquals(r, {
     439            'Prime!': 'int', 'much space': 'int', 'Questions?': 'text'})
     440        query('drop table "%s"' % table)
     441
     442    def testGetAttnamesWithRegtypes(self):
     443        get_attnames = self.db.get_attnames
     444        query = self.db.query
     445        query("drop table if exists test_table")
     446        query("create table test_table("
     447            " n int, alpha smallint, beta bool,"
     448            " gamma char(5), tau text, v varchar(3))")
     449        self.db.use_regtypes(True)
     450        try:
     451            r = get_attnames("test_table")
     452            self.assertIsInstance(r, dict)
     453        finally:
     454            self.db.use_regtypes(False)
     455        self.assertEquals(r, dict(
     456            n='integer', alpha='smallint', beta='boolean',
     457            gamma='character', tau='text', v='character varying'))
     458        query("drop table test_table")
     459
     460    def testGetAttnamesIsCached(self):
     461        get_attnames = self.db.get_attnames
     462        query = self.db.query
     463        query("drop table if exists test_table")
     464        query("create table test_table(col int)")
     465        r = get_attnames("test_table")
     466        self.assertIsInstance(r, dict)
     467        self.assertEquals(r, dict(col='int'))
     468        query("drop table test_table")
     469        query("create table test_table(col text)")
     470        r = get_attnames("test_table")
     471        self.assertEquals(r, dict(col='int'))
     472        r = get_attnames("test_table", flush=True)
     473        self.assertEquals(r, dict(col='text'))
     474        query("drop table test_table")
     475        r = get_attnames("test_table")
     476        self.assertEquals(r, dict(col='text'))
     477        self.assertRaises(pg.ProgrammingError,
     478            get_attnames, "test_table", flush=True)
     479
     480    def testGetAttnamesIsOrdered(self):
     481        get_attnames = self.db.get_attnames
     482        query = self.db.query
     483        query("drop table if exists test_table")
     484        query("create table test_table("
     485            " n int, alpha smallint, v varchar(3),"
     486            " gamma char(5), tau text, beta bool)")
     487        r = get_attnames("test_table")
     488        self.assertIsInstance(r, OrderedDict)
     489        self.assertEquals(r, OrderedDict([
     490            ('n', 'int'), ('alpha', 'int'), ('v', 'text'),
     491            ('gamma', 'text'), ('tau', 'text'), ('beta', 'bool')]))
     492        query("drop table test_table")
     493        if OrderedDict is dict:
     494            self.skipTest('OrderedDict is not supported')
     495        r = ' '.join(list(r.keys()))
     496        self.assertEquals(r, 'n alpha v gamma tau beta')
     497
    409498    def testQuery(self):
    410499        query = self.db.query
     
    697786        self.assertRaises(pg.ProgrammingError, get, table, 2)
    698787        self.assertEqual(get(table, dict(n=2, m=2))['t'], 'd')
    699         self.assertEqual(get(table, dict(n=1, m=2),
    700                              ('n', 'm'))['t'], 'b')
    701         self.assertEqual(get(table, dict(n=3, m=2),
    702                              frozenset(['n', 'm']))['t'], 'f')
     788        r = get(table, dict(n=1, m=2), ('n', 'm'))
     789        self.assertEqual(r['t'], 'b')
     790        r = get(table, dict(n=3, m=2), frozenset(['n', 'm']))
     791        self.assertEqual(r['t'], 'f')
    703792        query('drop table "%s"' % table)
    704793
Note: See TracChangeset for help on using the changeset viewer.