Changeset 784


Ignore:
Timestamp:
Jan 26, 2016, 12:17:23 PM (4 years ago)
Author:
cito
Message:

Make type cache and cursor description more useful

The type cache now stores some more information, e.g. whether a type is a base
type or a composite type and the category of the type. This may be later used
for casting composite types, or exposed to the user.

The cursor description now contains proper information on the size of numeric
types (including precision and scale).

Location:
trunk
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/pgdb.py

    r781 r784  
    157157
    158158
     159TypeInfo = namedtuple('TypeInfo',
     160    ['oid', 'name', 'len', 'type', 'category', 'delim', 'relid'])
     161
     162
    159163class TypeCache(dict):
    160     """Cache for database types."""
     164    """Cache for database types.
     165
     166    This cache maps type OIDs to TypeInfo tuples containing the name
     167    and other important info for the database type with the given OID.
     168    """
    161169
    162170    def __init__(self, cnx):
    163171        """Initialize type cache for connection."""
    164172        super(TypeCache, self).__init__()
     173        self._escape_string = cnx.escape_string
    165174        self._src = cnx.source()
     175
     176    def __missing__(self, key):
     177        q = ("SELECT oid, typname,"
     178             " typlen, typtype, typcategory, typdelim, typrelid"
     179            " FROM pg_type WHERE ")
     180        if isinstance(key, int):
     181            q += "oid = %d" % key
     182        else:
     183            q += "typname = '%s'" % self._escape_string(key)
     184        self._src.execute(q)
     185        res = list(self._src.fetch(1)[0])
     186        res[0] = int(res[0])
     187        res[2] = int(res[2])
     188        res[6] = int(res[6])
     189        res = TypeInfo(*res)
     190        self[res.oid] = self[res.name] = res
     191        return res
    166192
    167193    @staticmethod
     
    182208            return cast(value)
    183209
    184     def getdescr(self, oid):
    185         """Get name of database type with given oid."""
    186         try:
    187             return self[oid]
    188         except KeyError:
    189             self._src.execute(
    190                 "SELECT typname, typlen "
    191                 "FROM pg_type WHERE oid=%s" % oid)
    192             res = self._src.fetch(1)[0]
    193             # The column name is omitted from the return value.
    194             # It will have to be prepended by the caller.
    195             res = (res[0], None, int(res[1]), None, None, None)
    196             self[oid] = res
    197             return res
    198 
    199210
    200211_re_array_escape = regex(r'(["\\])')
    201212_re_array_quote = regex(r'[{},"\\\s]|^[Nn][Uu][Ll][Ll]$')
     213
    202214
    203215class _quotedict(dict):
     
    315327            parameters = tuple(map(self._quote, parameters))
    316328        return string % parameters
     329
     330    def _make_description(self, info):
     331        """Make the description tuple for the given field info."""
     332        name, typ, size, mod = info[1:]
     333        type_info = self._type_cache[typ]
     334        type_code = type_info.name
     335        if mod > 0:
     336            mod -= 4
     337        if type_code == 'numeric':
     338            precision, scale = mod >> 16, mod & 0xffff
     339            size = precision
     340        else:
     341            if not size:
     342                size = type_info.size
     343            if size == -1:
     344                size = mod
     345            precision = scale = None
     346        return CursorDescription(name, type_code,
     347            None, size, precision, scale, None)
    317348
    318349    def close(self):
     
    378409        if self._src.resulttype == RESULT_DQL:
    379410            self.rowcount = self._src.ntuples
    380             getdescr = self._type_cache.getdescr
    381             description = [CursorDescription(
    382                 info[1], *getdescr(info[2])) for info in self._src.listinfo()]
    383             self.colnames = [info[0] for info in description]
    384             self.coltypes = [info[1] for info in description]
     411            description = self._make_description
     412            description = [description(info) for info in self._src.listinfo()]
     413            self.colnames = [d[0] for d in description]
     414            self.coltypes = [d[1] for d in description]
    385415            self.description = description
    386416            self.lastrowid = None
  • trunk/pgmodule.c

    r781 r784  
    36223622
    36233623        /* allocates tuple */
    3624         result = PyTuple_New(3);
     3624        result = PyTuple_New(5);
    36253625        if (!result)
    36263626                return NULL;
     
    36323632        PyTuple_SET_ITEM(result, 2,
    36333633                PyInt_FromLong(PQftype(self->result, num)));
     3634        PyTuple_SET_ITEM(result, 3,
     3635                PyInt_FromLong(PQfsize(self->result, num)));
     3636        PyTuple_SET_ITEM(result, 4,
     3637                PyInt_FromLong(PQfmod(self->result, num)));
    36343638
    36353639        return result;
  • trunk/tests/test_dbapi20.py

    r781 r784  
    263263        con = self._connect()
    264264        cur = con.cursor()
    265         cur.execute("select 123456789::int8 as col")
     265        cur.execute("select 123456789::int8 col0,"
     266            " 123456.789::numeric(41, 13) as col1,"
     267            " 'foobar'::char(39) as col2")
    266268        desc = cur.description
    267269        self.assertIsInstance(desc, list)
    268         self.assertEqual(len(desc), 1)
    269         desc = desc[0]
    270         self.assertIsInstance(desc, tuple)
    271         self.assertEqual(len(desc), 7)
    272         self.assertEqual(desc.name, 'col')
    273         self.assertEqual(desc.type_code, 'int8')
    274         self.assertIsNone(desc.display_size)
    275         self.assertIsInstance(desc.internal_size, int)
    276         self.assertEqual(desc.internal_size, 8)
    277         self.assertIsNone(desc.precision)
    278         self.assertIsNone(desc.scale)
    279         self.assertIsNone(desc.null_ok)
     270        self.assertEqual(len(desc), 3)
     271        cols = [('int8', 8, None), ('numeric', 41, 13), ('bpchar', 39, None)]
     272        for i in range(3):
     273            c, d = cols[i], desc[i]
     274            self.assertIsInstance(d, tuple)
     275            self.assertEqual(len(d), 7)
     276            self.assertIsInstance(d.name, str)
     277            self.assertEqual(d.name, 'col%d' % i)
     278            self.assertIsInstance(d.type_code, str)
     279            self.assertEqual(d.type_code, c[0])
     280            self.assertIsNone(d.display_size)
     281            self.assertIsInstance(d.internal_size, int)
     282            self.assertEqual(d.internal_size, c[1])
     283            if c[2] is not None:
     284                self.assertIsInstance(d.precision, int)
     285                self.assertEqual(d.precision, c[1])
     286                self.assertIsInstance(d.scale, int)
     287                self.assertEqual(d.scale, c[2])
     288            else:
     289                self.assertIsNone(d.precision)
     290                self.assertIsNone(d.scale)
     291            self.assertIsNone(d.null_ok)
     292
     293    def test_type_cache(self):
     294        con = self._connect()
     295        cur = con.cursor()
     296        type_info = cur._type_cache['numeric']
     297        self.assertEqual(type_info.oid, 1700)
     298        self.assertEqual(type_info.name, 'numeric')
     299        self.assertEqual(type_info.type, 'b')  # base
     300        self.assertEqual(type_info.category, 'N')  # numeric
     301        self.assertEqual(type_info.delim, ',')
     302        self.assertIs(cur._type_cache[1700], type_info)
    280303
    281304    def test_cursor_iteration(self):
Note: See TracChangeset for help on using the changeset viewer.