Changeset 725 for trunk


Ignore:
Timestamp:
Jan 12, 2016, 10:32:32 AM (4 years ago)
Author:
cito
Message:

Improve implementation and test for pkey()

Location:
trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/pg.py

    r723 r725  
    3838from decimal import Decimal
    3939from collections import namedtuple
     40from itertools import groupby
    4041
    4142try:
     
    566567        If newpkey is set and is not a dictionary then set that
    567568        value as the primary key of the class.  If it is a dictionary
    568         then replace the _pkeys dictionary with a copy of it.
    569 
    570         """
     569        then replace the internal cache for primary keys with a copy of it.
     570
     571        """
     572        add_schema = self._add_schema
     573
    571574        # First see if the caller is supplying a dictionary
    572575        if isinstance(newpkey, dict):
    573576            # make sure that all classes have a namespace
    574             self._pkeys = dict([
    575                 (cl if '.' in cl else 'public.' + cl, pkey)
    576                 for cl, pkey in newpkey.items()])
     577            self._pkeys = dict((add_schema(cl), pkey)
     578                for cl, pkey in newpkey.items())
    577579            return self._pkeys
    578580
    579         qcl = self._add_schema(cl)  # build fully qualified class name
     581        qcl = add_schema(cl)  # build fully qualified class name
    580582        # Check if the caller is supplying a new primary key for the class
    581583        if newpkey:
     
    586588        if qcl not in self._pkeys:
    587589            # if not found, check again in case it was added after we started
    588             self._pkeys = {}
    589590            q = ("SELECT s.nspname, r.relname, a.attname"
    590591                " FROM pg_class r"
     
    595596                " AND NOT a.attisdropped"
    596597                " JOIN pg_index i ON i.indrelid = r.oid"
    597                 " AND i.indisprimary AND a.attnum = ANY (i.indkey)")
    598             for r in self.db.query(q).getresult():
    599                 cl, pkey = _join_parts(r[:2]), r[2]
    600                 self._pkeys.setdefault(cl, []).append(pkey)
    601             # (only) for composite primary keys, the values will be frozensets
    602             for cl, pkey in self._pkeys.items():
    603                 self._pkeys[cl] = frozenset(pkey) if len(pkey) > 1 else pkey[0]
    604             self._do_debug(self._pkeys)
     598                " AND i.indisprimary AND a.attnum = ANY (i.indkey)"
     599                " ORDER BY 1,2")
     600            rows = self.db.query(q).getresult()
     601            pkeys = {}
     602            for cl, group in groupby(rows, lambda row: row[:2]):
     603                cl = _join_parts(cl)
     604                pkey = [row[2] for row in group]
     605                pkeys[cl] = frozenset(pkey) if len(pkey) > 1 else pkey[0]
     606            self._do_debug(pkeys)
     607            self._pkeys = pkeys
    605608
    606609        # will raise an exception if primary key doesn't exist
  • trunk/tests/test_classic_dbwrapper.py

    r709 r725  
    559559    def testPkey(self):
    560560        query = self.db.query
    561         for n in range(4):
    562             query("drop table if exists pkeytest%d" % n)
    563         query("create table pkeytest0 ("
    564             "a smallint)")
    565         query("create table pkeytest1 ("
    566             "b smallint primary key)")
    567         query("create table pkeytest2 ("
    568             "c smallint, d smallint primary key)")
    569         query("create table pkeytest3 ("
    570             "e smallint, f smallint, g smallint, "
    571             "h smallint, i smallint, "
    572             "primary key (f,h))")
    573561        pkey = self.db.pkey
    574         self.assertRaises(KeyError, pkey, 'pkeytest0')
    575         self.assertEqual(pkey('pkeytest1'), 'b')
    576         self.assertEqual(pkey('pkeytest2'), 'd')
    577         self.assertEqual(pkey('pkeytest3'), frozenset('fh'))
    578         self.assertEqual(pkey('pkeytest0', 'none'), 'none')
    579         self.assertEqual(pkey('pkeytest0'), 'none')
    580         pkey(None, {'t': 'a', 'n.t': 'b'})
    581         self.assertEqual(pkey('t'), 'a')
    582         self.assertEqual(pkey('n.t'), 'b')
    583         self.assertRaises(KeyError, pkey, 'pkeytest0')
    584         for n in range(4):
    585             query("drop table pkeytest%d" % n)
     562        for t in ('pkeytest', 'primary key test'):
     563            for n in range(7):
     564                query('drop table if exists "%s%d"' % (t, n))
     565            query('create table "%s0" ('
     566                "a smallint)" % t)
     567            query('create table "%s1" ('
     568                "b smallint primary key)" % t)
     569            query('create table "%s2" ('
     570                "c smallint, d smallint primary key)" % t)
     571            query('create table "%s3" ('
     572                "e smallint, f smallint, g smallint, "
     573                "h smallint, i smallint, "
     574                "primary key (f, h))" % t)
     575            query('create table "%s4" ('
     576                "more_than_one_letter varchar primary key)" % t)
     577            query('create table "%s5" ('
     578                '"with space" date primary key)' % t)
     579            query('create table "%s6" ('
     580                'a_very_long_column_name varchar, '
     581                '"with space" date, '
     582                '"42" int, '
     583                "primary key (a_very_long_column_name, "
     584                '"with space", "42"))' % t)
     585            self.assertRaises(KeyError, pkey, '%s0' % t)
     586            self.assertEqual(pkey('%s1' % t), 'b')
     587            self.assertEqual(pkey('%s2' % t), 'd')
     588            r = pkey('%s3' % t)
     589            self.assertIsInstance(r, frozenset)
     590            self.assertEqual(r, frozenset('fh'))
     591            self.assertEqual(pkey('%s4' % t), 'more_than_one_letter')
     592            self.assertEqual(pkey('%s5' % t), 'with space')
     593            r = pkey('%s6' % t)
     594            self.assertIsInstance(r, frozenset)
     595            self.assertEqual(r, frozenset([
     596                'a_very_long_column_name', 'with space', '42']))
     597            self.assertEqual(pkey('%s0' % t, 'none'), 'none')
     598            self.assertEqual(pkey('%s0' % t), 'none')
     599            pkey(None, {'%s0' % t: 'a', 'public."%s1"' % t: 'b'})
     600            self.assertEqual(pkey('%s0' % t), 'a')
     601            self.assertEqual(pkey('%s1' % t), 'b')
     602            pkey(None, {})
     603            self.assertRaises(KeyError, pkey, '%s0' % t)
     604            for n in range(7):
     605                query('drop table "%s%d"' % (t, n))
    586606
    587607    def testGetDatabases(self):
Note: See TracChangeset for help on using the changeset viewer.