Changeset 682


Ignore:
Timestamp:
Jan 1, 2016, 12:32:24 PM (4 years ago)
Author:
cito
Message:

Add cursor classes for returning various row types

Currently, PyGreSQL returns rows as lists in the DB-API 2 module.
It is more common to return tuples (and consistent with how it
is done int the classic module). Even better are named tuples
which are cheap and available in all Py versions PyGreSQL supports.

So I made tuples the default now and added additional Cursor types
for returning rows as lists, named tuples, dicts and ordered dicts.
Also added an attribute for setting the default Cursor type in the
connection and special methods for creating the various cursor types
from the same connection.

However, I think this should be simplified and named tuples should
become the default. Named tuples can be easily converted to ordinary
lists and tuples (although this should never be necessary), and they
can also be easily converted to ordered dicts by calling ._asdict().
So I'm planning to change this before long, but for reference I have
checked in this implementation with all necessary tests anyway.

Location:
trunk/module
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/module/pgdb.py

    r681 r682  
    8989except NameError:  # Python >= 3.0
    9090    basestring = (str, bytes)
     91
     92try:
     93    from collections import OrderedDict
     94except ImportError:  # Python 2.6 or 3.0
     95    try:
     96        from ordereddict import OrderedDict
     97    except Exception:
     98        def OrderedDict(*args):
     99            raise NotSupportedError('OrderedDict is not supported')
    91100
    92101set_decimal(Decimal)
     
    220229    """Cursor object."""
    221230
     231    row_factory = tuple  # the factory for creating result rows
     232
    222233    def __init__(self, dbcnx):
    223234        """Create a cursor object for the database connection."""
     
    226237        self._type_cache = dbcnx._type_cache
    227238        self._src = self._cnx.source()
     239        self.colnames = self.coltypes = None
    228240        self.description = None
    229241        self.rowcount = -1
     
    288300        return string % params
    289301
    290     @staticmethod
    291     def row_factory(row):
    292         """Process rows before they are returned.
    293 
    294         You can overwrite this with a custom row factory,
    295         e.g. a dict factory:
    296 
    297             class DictCursor(pgdb.Cursor):
    298 
    299                 def row_factory(self, row):
    300                     return {desc[0]:value
    301                         for desc, value in zip(self.description, row)}
    302 
    303             cur = DictCursor(con)
    304 
    305         """
    306         return row
    307 
    308302    def close(self):
    309303        """Close the cursor object."""
    310304        self._src.close()
     305        self.colnames = self.coltypes = None
    311306        self.description = None
    312307        self.rowcount = -1
     
    331326            # don't do anything without parameters
    332327            return
     328        self.colnames = self.coltypes = None
    333329        self.description = None
    334330        self.rowcount = -1
     
    365361            self.rowcount = self._src.ntuples
    366362            getdescr = self._type_cache.getdescr
    367             coltypes = self._src.listinfo()
    368             self.description = [CursorDescription(
    369                 typ[1], *getdescr(typ[2])) for typ in coltypes]
     363            description = [CursorDescription(
     364                info[1], *getdescr(info[2])) for info in self._src.listinfo()]
     365            self.colnames = [info[0] for info in description]
     366            self.coltypes = [info[1] for info in description]
     367            self.description = description
    370368            self.lastrowid = None
    371369        else:
    372370            self.rowcount = totrows
    373             self.description = None
    374371            self.lastrowid = self._src.oidstatus()
    375372        # return the cursor object, so you can write statements such as
     
    408405        except Error as err:
    409406            raise _db_error(str(err))
    410         row_factory = self.row_factory
    411407        typecast = self._type_cache.typecast
    412         coltypes = [desc[1] for desc in self.description]
    413         return [row_factory([typecast(*args)
    414             for args in zip(coltypes, row)]) for row in result]
     408        return [self.row_factory([typecast(typ, value)
     409            for typ, value in zip(self.coltypes, row)]) for row in result]
    415410
    416411    def __next__(self):
     
    444439    ['name', 'type_code', 'display_size', 'internal_size',
    445440     'precision', 'scale', 'null_ok'])
     441
     442
     443class ListCursor(Cursor):
     444    """Cursor object that returns rows as lists."""
     445
     446    row_factory = list
     447
     448
     449class DictCursor(Cursor):
     450    """Cursor object that returns rows as dictionaries."""
     451
     452    def row_factory(self, row):
     453        """Turn a row from a tuple into a dictionary."""
     454        # not using dict comprehension to stay compatible with Py 2.6
     455        return dict((key, value) for key, value in zip(self.colnames, row))
     456
     457
     458class OrderedDictCursor(Cursor):
     459    """Cursor object that returns rows as ordered dictionaries."""
     460
     461    def row_factory(self, row):
     462        """Turn a row from a tuple into an ordered dictionary."""
     463        return OrderedDict(
     464            (key, value) for key, value in zip(self.colnames, row))
     465
     466
     467class NamedTupleCursor(Cursor):
     468    """Cursor object that returns rows as named tuples."""
     469
     470    @property
     471    def colnames(self):
     472        return self._colnames
     473
     474    @colnames.setter
     475    def colnames(self, value):
     476        self._colnames = value
     477        if value:
     478            try:
     479                try:
     480                    factory = namedtuple('Row', value, rename=True)._make
     481                except TypeError:  # Python 2.6 and 3.0 do not support rename
     482                    value = [v if v.isalnum() else 'column_%d' % n
     483                             for n, v in enumerate(value)]
     484                    factory = namedtuple('Row', value)._make
     485            except ValueError:
     486                value = ['column_%d' % n for n in range(len(value))]
     487                factory = namedtuple('Row', value)._make
     488        else:
     489            factory = tuple
     490        self._colnames, self.row_factory = value, factory
    446491
    447492
     
    468513        self._tnx = False  # transaction state
    469514        self._type_cache = TypeCache(cnx)
     515        self.cursor_type = Cursor
     516        self.row_type = tuple
    470517        try:
    471518            self._cnx.source()
     
    533580            raise _op_error("connection has been closed")
    534581
    535     def cursor(self):
     582    def cursor(self, cls=None):
    536583        """Return a new cursor object using the connection."""
    537584        if self._cnx:
    538585            try:
    539                 return Cursor(self)
     586                return (cls or self.cursor_type)(self)
    540587            except Exception:
    541588                raise _op_error("invalid connection")
    542589        else:
    543590            raise _op_error("connection has been closed")
     591
     592    def list_cursor(self):
     593        """Return a new tuple cursor object using the connection."""
     594        return self.cursor(ListCursor)
     595
     596    def tuple_cursor(self):
     597        """Return a new tuple cursor object using the connection."""
     598        return self.cursor(Cursor)
     599
     600    def named_tuple_cursor(self):
     601        """Return a new named tuple cursor object using the connection."""
     602        return self.cursor(NamedTupleCursor)
     603
     604    def dict_cursor(self):
     605        """Return a new dict cursor object using the connection."""
     606        return self.cursor(DictCursor)
     607
     608    def ordered_dict_cursor(self):
     609        """Return a new ordered dict cursor object using the connection."""
     610        return self.cursor(OrderedDictCursor)
    544611
    545612    if shortcutmethods:  # otherwise do not implement and document this
  • trunk/module/tests/test_dbapi20.py

    r681 r682  
    3434    long = int
    3535
     36try:
     37    from collections import OrderedDict
     38except ImportError:  # Python 2.6 or 3.0
     39    OrderedDict = None
     40
    3641
    3742class test_PyGreSQL(dbapi20.DatabaseAPI20Test):
     
    6267        dbapi20.DatabaseAPI20Test.tearDown(self)
    6368
     69    def test_cursortype(self):
     70        con = self._connect()
     71        self.assertIs(con.cursor_type, pgdb.Cursor)
     72        cur = con.cursor()
     73        self.assertIsInstance(cur, pgdb.Cursor)
     74        self.assertNotIsInstance(cur, pgdb.ListCursor)
     75        cur.close()
     76        con.cursor_type = pgdb.ListCursor
     77        cur = con.cursor()
     78        self.assertIsInstance(cur, pgdb.Cursor)
     79        self.assertIsInstance(cur, pgdb.ListCursor)
     80        cur.close()
     81        cur = con.cursor()
     82        self.assertIsInstance(cur, pgdb.ListCursor)
     83        cur.close()
     84        con.close()
     85        con = self._connect()
     86        self.assertIs(con.cursor_type, pgdb.Cursor)
     87        cur = con.cursor()
     88        self.assertIsInstance(cur, pgdb.Cursor)
     89        self.assertNotIsInstance(cur, pgdb.ListCursor)
     90        cur.close()
     91        con.close()
     92
     93    def test_list_cursor(self):
     94        con = self._connect()
     95        cur = con.list_cursor()
     96        self.assertIsInstance(cur, pgdb.ListCursor)
     97        cur.execute("select 1, 2, 3")
     98        res = cur.fetchone()
     99        cur.close()
     100        con.close()
     101        self.assertIsInstance(res, list)
     102        self.assertEqual(res, [1, 2, 3])
     103
     104    def test_tuple_cursor(self):
     105        con = self._connect()
     106        cur = con.tuple_cursor()
     107        self.assertIsInstance(cur, pgdb.Cursor)
     108        cur.execute("select 1, 2, 3")
     109        res = cur.fetchone()
     110        cur.close()
     111        con.close()
     112        self.assertIsInstance(res, tuple)
     113        self.assertEqual(res, (1, 2, 3))
     114        self.assertRaises(AttributeError, getattr, res, '_fields')
     115
     116    def test_named_tuple_cursor(self):
     117        con = self._connect()
     118        cur = con.named_tuple_cursor()
     119        self.assertIsInstance(cur, pgdb.NamedTupleCursor)
     120        cur.execute("select 1 as abc, 2 as de, 3 as f")
     121        res = cur.fetchone()
     122        cur.close()
     123        con.close()
     124        self.assertIsInstance(res, tuple)
     125        self.assertEqual(res, (1, 2, 3))
     126        self.assertEqual(res._fields, ('abc', 'de', 'f'))
     127        self.assertEqual(res.abc, 1)
     128        self.assertEqual(res.de, 2)
     129        self.assertEqual(res.f, 3)
     130
     131    def test_named_tuple_cursor_with_bad_names(self):
     132        con = self._connect()
     133        cur = con.named_tuple_cursor()
     134        self.assertIsInstance(cur, pgdb.NamedTupleCursor)
     135        cur.execute("select 1, 2, 3")
     136        res = cur.fetchone()
     137        self.assertIsInstance(res, tuple)
     138        self.assertEqual(res, (1, 2, 3))
     139        old_py = OrderedDict is None  # Python 2.6 or 3.0
     140        # old Python versions cannot rename tuple fields with underscore
     141        if old_py:
     142            self.assertEqual(res._fields, ('column_0', 'column_1', 'column_2'))
     143        else:
     144            self.assertEqual(res._fields, ('_0', '_1', '_2'))
     145        cur.execute("select 1 as one, 2, 3 as three")
     146        res = cur.fetchone()
     147        self.assertIsInstance(res, tuple)
     148        self.assertEqual(res, (1, 2, 3))
     149        if old_py:  # cannot auto rename with underscore
     150            self.assertEqual(res._fields, ('one', 'column_1', 'three'))
     151        else:
     152            self.assertEqual(res._fields, ('one', '_1', 'three'))
     153        cur.execute("select 1 as abc, 2 as def")
     154        res = cur.fetchone()
     155        self.assertIsInstance(res, tuple)
     156        self.assertEqual(res, (1, 2))
     157        if old_py:
     158            self.assertEqual(res._fields, ('column_0', 'column_1'))
     159        else:
     160            self.assertEqual(res._fields, ('abc', '_1'))
     161        cur.close()
     162        con.close()
     163
     164    def test_dict_cursor(self):
     165        con = self._connect()
     166        cur = con.dict_cursor()
     167        self.assertIsInstance(cur, pgdb.DictCursor)
     168        cur.execute("select 1 as abc, 2 as de, 3 as f")
     169        res = cur.fetchone()
     170        cur.close()
     171        con.close()
     172        self.assertIsInstance(res, dict)
     173        self.assertEqual(res, {'abc': 1, 'de': 2, 'f': 3})
     174        self.assertRaises(TypeError, res.popitem, last=True)
     175
     176    def test_ordered_dict_cursor(self):
     177        con = self._connect()
     178        cur = con.ordered_dict_cursor()
     179        self.assertIsInstance(cur, pgdb.OrderedDictCursor)
     180        cur.execute("select 1 as abc, 2 as de, 3 as f")
     181        try:
     182            res = cur.fetchone()
     183        except pgdb.NotSupportedError:
     184            if OrderedDict is None:
     185                return
     186            self.fail('OrderedDict supported by Python, but not by pgdb')
     187        finally:
     188            cur.close()
     189            con.close()
     190        self.assertIsInstance(res, dict)
     191        self.assertEqual(res, {'abc': 1, 'de': 2, 'f': 3})
     192        self.assertEqual(res.popitem(last=True), ('f', 3))
     193
    64194    def test_row_factory(self):
    65195
     
    67197
    68198            def row_factory(self, row):
    69                 return {desc[0]:value
    70                     for desc, value in zip(self.description, row)}
     199                # not using dict comprehension to stay compatible with Py 2.6
     200                return dict(('column %s' % desc[0], value)
     201                    for desc, value in zip(self.description, row))
    71202
    72203        con = self._connect()
     
    74205        ret = cur.execute("select 1 as a, 2 as b")
    75206        self.assertTrue(ret is cur, 'execute() should return cursor')
    76         self.assertEqual(cur.fetchone(), {'a': 1, 'b': 2})
     207        self.assertEqual(cur.fetchone(), {'column a': 1, 'column b': 2})
     208
     209    def test_colnames(self):
     210        con = self._connect()
     211        cur = con.cursor()
     212        cur.execute("select 1, 2, 3")
     213        names = cur.colnames
     214        self.assertIsInstance(names, list)
     215        self.assertEqual(names, ['?column?', '?column?', '?column?'])
     216        cur.execute("select 1 as a, 2 as bc, 3 as def, 4 as g")
     217        names = cur.colnames
     218        self.assertIsInstance(names, list)
     219        self.assertEqual(names, ['a', 'bc', 'def', 'g'])
     220
     221    def test_coltypes(self):
     222        con = self._connect()
     223        cur = con.cursor()
     224        cur.execute("select 1::int2, 2::int4, 3::int8")
     225        types = cur.coltypes
     226        self.assertIsInstance(types, list)
     227        self.assertEqual(types, ['int2', 'int4', 'int8'])
    77228
    78229    def test_description_named(self):
     
    102253    def test_fetch_2_rows(self):
    103254        Decimal = pgdb.decimal_type()
    104         values = ['test', pgdb.Binary(b'\xff\x52\xb2'),
     255        values = ('test', pgdb.Binary(b'\xff\x52\xb2'),
    105256            True, 5, 6, 5.7, Decimal('234.234234'), Decimal('75.45'),
    106257            '2011-07-17', '15:47:42', '2008-10-20 15:25:35', '15:31:05',
    107             7897234]
     258            7897234)
    108259        table = self.table_prefix + 'booze'
    109260        con = self._connect()
Note: See TracChangeset for help on using the changeset viewer.