Changeset 729 for trunk


Ignore:
Timestamp:
Jan 12, 2016, 4:29:07 PM (4 years ago)
Author:
cito
Message:

Simplify caching and handling of class names

The caches now use the class names as keys as they are passed in.
We do not automatically calculate the qualified name any more,
since this causes too much overhead. Also, we fill the pkey cache
not pro-actively with tables from all possible schemes any more.

Most of the internal auxiliary functions for handling class names
could be discarded by making good use of quote_ident and reglass.

Location:
trunk
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/changelog.rst

    r710 r729  
    2828- The tty parameter and attribute of database connections has been
    2929  removed since it is not supported any more since PostgreSQL 7.4.
     30- The table name that is affixed to the name of the OID column returned
     31  by the get() method of the classic interface will not automatically
     32  be fully qualified any more. This reduces overhead from the interface,
     33  but it means you must always write the table name in the same way when
     34  you call the methods using it and you are using tables with OIDs.
     35  Note that OIDs are considered deprecated anyway, and they are not created
     36  by default any more in PostgreSQL 8.1 and later.
     37- Simplified the internal caching and mechanisms for automatic quoting
     38  of class names in the classic interface, these things should now both
     39  perform better and use less memory.
     40
    3041
    3142Version 4.2
  • trunk/docs/contents/pg/db_wrapper.rst

    r723 r729  
    6464    :returns: Name of the field which is the primary key of the table
    6565    :rtype: str
     66    :raises KeyError: the table does not have a primary key
    6667
    6768This method returns the primary key of a table. For composite primary
    68 keys, the return value will be a frozenset. Note that this raises an
    69 exception if the table does not have a primary key.
     69keys, the return value will be a frozenset. Note that this raises a
     70KeyError if the table does not have a primary key.
    7071
    7172get_databases -- get list of databases in the system
     
    162163The OID is also put into the dictionary if the table has one, but in
    163164order to allow the caller to work with multiple tables, it is munged
    164 as ``oid(schema.table)``.
     165as ``oid(table)``.
    165166
    166167insert -- insert a row into a database table
  • trunk/pg.py

    r727 r729  
    5050# Auxiliary functions that are independent from a DB connection:
    5151
    52 def _is_quoted(s):
    53     """Check whether this string is a quoted identifier."""
    54     s = s.replace('_', 'a')
    55     return not s.isalnum() or s[:1].isdigit() or s != s.lower()
    56 
    57 
    58 def _is_unquoted(s):
    59     """Check whether this string is an unquoted identifier."""
    60     s = s.replace('_', 'a')
    61     return s.isalnum() and not s[:1].isdigit()
    62 
    63 
    64 def _split_first_part(s):
    65     """Split the first part of a dot separated string."""
    66     s = s.lstrip()
    67     if s[:1] == '"':
    68         p = []
    69         s = s.split('"', 3)[1:]
    70         p.append(s[0])
    71         while len(s) == 3 and s[1] == '':
    72             p.append('"')
    73             s = s[2].split('"', 2)
    74             p.append(s[0])
    75         p = [''.join(p)]
    76         s = '"'.join(s[1:]).lstrip()
    77         if s:
    78             if s[:0] == '.':
    79                 p.append(s[1:])
    80             else:
    81                 s = _split_first_part(s)
    82                 p[0] += s[0]
    83                 if len(s) > 1:
    84                     p.append(s[1])
    85     else:
    86         p = s.split('.', 1)
    87         s = p[0].rstrip()
    88         if _is_unquoted(s):
    89             s = s.lower()
    90         p[0] = s
    91     return p
    92 
    93 
    94 def _split_parts(s):
    95     """Split all parts of a dot separated string."""
    96     q = []
    97     while s:
    98         s = _split_first_part(s)
    99         q.append(s[0])
    100         if len(s) < 2:
    101             break
    102         s = s[1]
    103     return q
    104 
    105 
    106 def _join_parts(s):
    107     """Join all parts of a dot separated string."""
    108     return '.'.join(['"%s"' % p if _is_quoted(p) else p for p in s])
    109 
    110 
    111 def _oid_key(qcl):
     52def _quote_class_name(cl):
     53    """Quote a class name.
     54
     55    Class names are always quoted unless they contain a dot.
     56    In this ambiguous case quotes must be added manually.
     57
     58    """
     59    if '.' not in cl:
     60        cl = '"%s"' % cl
     61    return cl
     62
     63
     64def _quote_class_param(cl, param):
     65    """Quote parameter representing a class name.
     66
     67    The parameter is automatically quoted unless the class name contains a dot.
     68    In this ambiguous case quotes must be added manually.
     69
     70    """
     71    if isinstance(param, int):
     72        param = "$%d" % param
     73    if '.' not in cl:
     74        param = 'quote_ident(%s)' % (param,)
     75    return param
     76
     77
     78def _oid_key(cl):
    11279    """Build oid key from qualified class name."""
    113     return 'oid(%s)' % qcl
     80    return 'oid(%s)' % cl
     81
     82
     83def _simpletype(typ):
     84    """Determine a simplified name a pg_type name."""
     85    if typ.startswith('bool'):
     86        return 'bool'
     87    if typ.startswith(('abstime', 'date', 'interval', 'timestamp')):
     88        return 'date'
     89    if typ.startswith(('cid', 'oid', 'int', 'xid')):
     90        return 'int'
     91    if typ.startswith('float'):
     92        return 'float'
     93    if typ.startswith('numeric'):
     94        return 'num'
     95    if typ.startswith('money'):
     96        return 'money'
     97    if typ.startswith('bytea'):
     98        return 'bytea'
     99    return 'text'
    114100
    115101
     
    414400        return quote_func(self, d)
    415401
    416     def _split_schema(self, cl):
    417         """Return schema and name of object separately.
    418 
    419         This auxiliary function splits off the namespace (schema)
    420         belonging to the class with the name cl. If the class name
    421         is not qualified, the function is able to determine the schema
    422         of the class, taking into account the current search path.
    423 
    424         """
    425         s = _split_parts(cl)
    426         if len(s) > 1:  # name already qualified?
    427             # should be database.schema.table or schema.table
    428             if len(s) > 3:
    429                 raise _prg_error('Too many dots in class name %s' % cl)
    430             schema, cl = s[-2:]
    431         else:
    432             cl = s[0]
    433             # determine search path
    434             q = 'SELECT current_schemas(TRUE)'
    435             schemas = self.db.query(q).getresult()[0][0][1:-1].split(',')
    436             if schemas:  # non-empty path
    437                 # search schema for this object in the current search path
    438                 # (we could also use unnest with ordinality here to spare
    439                 # one query, but this is only possible since PostgreSQL 9.4)
    440                 q = ' UNION '.join(
    441                     ["SELECT %d::integer AS n, '%s'::name AS nspname"
    442                         % s for s in enumerate(schemas)])
    443                 q = ("SELECT nspname FROM pg_class r"
    444                     " JOIN pg_namespace s ON r.relnamespace = s.oid"
    445                     " JOIN (%s) AS p USING (nspname)"
    446                     " WHERE r.relname = $1 ORDER BY n LIMIT 1" % q)
    447                 schema = self.db.query(q, (cl,)).getresult()
    448                 if schema:  # schema found
    449                     schema = schema[0][0]
    450                 else:  # object not found in current search path
    451                     schema = 'public'
    452             else:  # empty path
    453                 schema = 'public'
    454         return schema, cl
    455 
    456     def _add_schema(self, cl):
    457         """Ensure that the class name is prefixed with a schema name."""
    458         return _join_parts(self._split_schema(cl))
    459 
    460402    # Public methods
    461403
     
    559501        return self.db.query(qstr, args)
    560502
    561     def pkey(self, cl, newpkey=None):
     503    def pkey(self, cl, flush=False):
    562504        """This method gets or sets the primary key of a class.
    563505
    564506        Composite primary keys are represented as frozensets. Note that
    565         this raises an exception if the table does not have a primary key.
    566 
    567         If newpkey is set and is not a dictionary then set that
    568         value as the primary key of the class.  If it is a dictionary
    569         then replace the internal cache for primary keys with a copy of it.
    570 
    571         """
    572         add_schema = self._add_schema
    573 
    574         # First see if the caller is supplying a dictionary
    575         if isinstance(newpkey, dict):
    576             # make sure that all classes have a namespace
    577             self._pkeys = dict((add_schema(cl), pkey)
    578                 for cl, pkey in newpkey.items())
    579             return self._pkeys
    580 
    581         qcl = add_schema(cl)  # build fully qualified class name
    582         # Check if the caller is supplying a new primary key for the class
    583         if newpkey:
    584             self._pkeys[qcl] = newpkey
    585             return newpkey
    586 
    587         # Get all the primary keys at once
    588         if qcl not in self._pkeys:
    589             # if not found, check again in case it was added after we started
    590             q = ("SELECT s.nspname, r.relname, a.attname"
    591                 " FROM pg_class r"
    592                 " JOIN pg_namespace s ON s.oid = r.relnamespace"
    593                 " AND s.nspname NOT SIMILAR"
    594                 " TO 'pg/_%|information/_schema' ESCAPE '/'"
    595                 " JOIN pg_attribute a ON a.attrelid = r.oid"
     507        this raises a KeyError if the table does not have a primary key.
     508
     509        If flush is set then the internal cache for primary keys will
     510        be flushed. This may be necessary after the database schema or
     511        the search path has been changed.
     512
     513        """
     514        pkeys = self._pkeys
     515        if flush:
     516            pkeys.clear()
     517            self._do_debug('pkey cache has been flushed')
     518        try:  # cache lookup
     519            pkey = pkeys[cl]
     520        except KeyError:  # cache miss, check the database
     521            q = ("SELECT a.attname FROM pg_index i"
     522                " JOIN pg_attribute a ON a.attrelid = i.indrelid"
     523                " AND a.attnum = ANY(i.indkey)"
    596524                " AND NOT a.attisdropped"
    597                 " JOIN pg_index i ON i.indrelid = r.oid"
    598                 " AND i.indisprimary AND a.attnum = ANY (i.indkey)"
    599                 " ORDER BY 1,2")
    600             rows = self.db.query(q).getresult()
    601             pkeys = {}
    602             for cl, group in groupby(rows, lambda row: row[:2]):
    603                 cl = _join_parts(cl)
    604                 pkey = [row[2] for row in group]
    605                 pkeys[cl] = frozenset(pkey) if len(pkey) > 1 else pkey[0]
    606             self._do_debug(pkeys)
    607             self._pkeys = pkeys
    608 
    609         # will raise an exception if primary key doesn't exist
    610         return self._pkeys[qcl]
     525                " WHERE i.indrelid=%s::regclass"
     526                " AND i.indisprimary" % _quote_class_param(cl, 1))
     527            pkey = self.db.query(q, (cl,)).getresult()
     528            if not pkey:
     529                raise KeyError('Class %s has no primary key' % cl)
     530            if len(pkey) > 1:
     531                pkey = frozenset([k[0] for k in pkey])
     532            else:
     533                pkey = pkey[0][0]
     534            pkeys[cl] = pkey  # cache it
     535        return pkey
    611536
    612537    def get_databases(self):
     
    625550        where = " AND r.relkind IN (%s)" % ','.join(
    626551            ["'%s'" % k for k in kinds]) if kinds else ''
    627         q = ("SELECT s.nspname, r.relname"
     552        q = ("SELECT quote_ident(s.nspname)||'.'||quote_ident(r.relname)"
    628553            " FROM pg_class r"
    629554            " JOIN pg_namespace s ON s.oid = r.relnamespace"
    630555            " WHERE s.nspname NOT SIMILAR"
    631556            " TO 'pg/_%%|information/_schema' ESCAPE '/' %s"
    632             " ORDER BY 1, 2") % where
    633         return [_join_parts(r) for r in self.db.query(q).getresult()]
     557            " ORDER BY s.nspname, r.relname") % where
     558        return [r[0] for r in self.db.query(q).getresult()]
    634559
    635560    def get_tables(self):
     
    637562        return self.get_relations('r')
    638563
    639     def get_attnames(self, cl, newattnames=None):
     564    def get_attnames(self, cl, flush=False):
    640565        """Given the name of a table, digs out the set of attribute names.
    641566
    642567        Returns a dictionary of attribute names (the names are the keys,
    643568        the values are the names of the attributes' types).
     569
    644570        If the optional newattnames exists, it must be a dictionary and
    645571        will become the new attribute names dictionary.
     
    649575
    650576        """
    651         if isinstance(newattnames, dict):
    652             self._attnames = newattnames
    653             return
    654         elif newattnames:
    655             raise _prg_error('If supplied, newattnames must be a dictionary')
    656         cl = self._split_schema(cl)  # split into schema and class
    657         qcl = _join_parts(cl)  # build fully qualified name
    658         # May as well cache them:
    659         if qcl in self._attnames:
    660             return self._attnames[qcl]
    661         if qcl not in self.get_relations('rv'):
    662             raise _prg_error('Class %s does not exist' % qcl)
    663 
    664         q = ("SELECT a.attname, t.typname%s"
    665             " FROM pg_class r"
    666             " JOIN pg_namespace s ON r.relnamespace = s.oid"
    667             " JOIN pg_attribute a ON a.attrelid = r.oid"
    668             " JOIN pg_type t ON t.oid = a.atttypid"
    669             " WHERE s.nspname = $1 AND r.relname = $2"
    670             " AND (a.attnum > 0 OR a.attname = 'oid')"
    671             " AND NOT a.attisdropped") % (
    672                 '::regtype' if self._regtypes else '',)
    673         q = self.db.query(q, cl).getresult()
    674 
    675         if self._regtypes:
    676             t = dict(q)
    677         else:
    678             t = {}
    679             for att, typ in q:
    680                 if typ.startswith('bool'):
    681                     typ = 'bool'
    682                 elif typ.startswith('abstime'):
    683                     typ = 'date'
    684                 elif typ.startswith('date'):
    685                     typ = 'date'
    686                 elif typ.startswith('interval'):
    687                     typ = 'date'
    688                 elif typ.startswith('timestamp'):
    689                     typ = 'date'
    690                 elif typ.startswith('oid'):
    691                     typ = 'int'
    692                 elif typ.startswith('int'):
    693                     typ = 'int'
    694                 elif typ.startswith('float'):
    695                     typ = 'float'
    696                 elif typ.startswith('numeric'):
    697                     typ = 'num'
    698                 elif typ.startswith('money'):
    699                     typ = 'money'
    700                 elif typ.startswith('bytea'):
    701                     typ = 'bytea'
    702                 else:
    703                     typ = 'text'
    704                 t[att] = typ
    705 
    706         self._attnames[qcl] = t  # cache it
    707         return self._attnames[qcl]
     577        attnames = self._attnames
     578        if flush:
     579            attnames.clear()
     580            self._do_debug('pkey cache has been flushed')
     581
     582        try:  # cache lookup
     583            names = attnames[cl]
     584        except KeyError:  # cache miss, check the database
     585            q = ("SELECT a.attname, t.typname%s"
     586                " FROM pg_attribute a"
     587                " JOIN pg_type t ON t.oid = a.atttypid"
     588                " WHERE a.attrelid = %s::regclass"
     589                " AND (a.attnum > 0 OR a.attname = 'oid')"
     590                " AND NOT a.attisdropped") % (
     591                    '::regtype' if self._regtypes else '',
     592                    _quote_class_param(cl, 1))
     593            names = self.db.query(q, (cl,)).getresult()
     594            if not names:
     595                raise KeyError('Class %s does not exist' % cl)
     596            if self._regtypes:
     597                names = dict(names)
     598            else:
     599                names = dict((name, _simpletype(typ)) for name, typ in names)
     600            attnames[cl] = names  # cache it
     601        return names
    708602
    709603    def use_regtypes(self, regtypes=None):
     
    720614    def has_table_privilege(self, cl, privilege='select'):
    721615        """Check whether current user has specified table privilege."""
    722         qcl = self._add_schema(cl)
    723616        privilege = privilege.lower()
    724         try:
    725             return self._privileges[(qcl, privilege)]
    726         except KeyError:
    727             q = "SELECT has_table_privilege($1, $2)"
    728             q = self.db.query(q, (qcl, privilege))
     617        try:  # ask cache
     618            return self._privileges[(cl, privilege)]
     619        except KeyError:  # cache miss, ask the database
     620            q = "SELECT has_table_privilege(%s, $2)" % (
     621                _quote_class_param(cl, 1),)
     622            q = self.db.query(q, (cl, privilege))
    729623            ret = q.getresult()[0][0] == self._make_bool(True)
    730             self._privileges[(qcl, privilege)] = ret
     624            self._privileges[(cl, privilege)] = ret  # cache it
    731625            return ret
    732626
     
    742636        The OID is also put into the dictionary if the table has one, but
    743637        in order to allow the caller to work with multiple tables, it is
    744         munged as oid(schema.table).
     638        munged as "oid(cl)".
    745639
    746640        """
     
    748642            cl = cl[:-1].rstrip()  # need parent table name
    749643        # build qualified class name
    750         qcl = self._add_schema(cl)
    751644        # To allow users to work with multiple tables,
    752         # we munge the name of the "oid" the key
    753         qoid = _oid_key(qcl)
     645        # we munge the name of the "oid" key
     646        qoid = _oid_key(cl)
    754647        if not keyname:
    755648            # use the primary key by default
    756649            try:
    757                 keyname = self.pkey(qcl)
     650                keyname = self.pkey(cl)
    758651            except KeyError:
    759                 raise _prg_error('Class %s has no primary key' % qcl)
    760         attnames = self.get_attnames(qcl)
     652                raise _prg_error('Class %s has no primary key' % cl)
     653        attnames = self.get_attnames(cl)
    761654        # We want the oid for later updates if that isn't the key
    762655        if keyname == 'oid':
     
    778671            where = ' AND '.join(['%s = %s'
    779672                % (k, self._quote(arg[k], attnames[k])) for k in keyname])
    780         q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (what, qcl, where)
     673        q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (
     674            what, _quote_class_name(cl), where)
    781675        self._do_debug(q)
    782676        res = self.db.query(q).dictresult()
    783677        if not res:
    784             raise _db_error('No such record in %s where %s' % (qcl, where))
     678            raise _db_error('No such record in %s where %s' % (cl, where))
    785679        for n, value in res[0].items():
    786680            if n == 'oid':
     
    808702
    809703        """
    810         qcl = self._add_schema(cl)
    811         qoid = _oid_key(qcl)
     704        qoid = _oid_key(cl)
    812705        if d is None:
    813706            d = {}
    814707        d.update(kw)
    815         attnames = self.get_attnames(qcl)
     708        attnames = self.get_attnames(cl)
    816709        names, values = [], []
    817710        for n in attnames:
     
    820713                values.append(self._quote(d[n], attnames[n]))
    821714        names, values = ', '.join(names), ', '.join(values)
    822         selectable = self.has_table_privilege(qcl)
     715        selectable = self.has_table_privilege(cl)
    823716        if selectable:
    824717            ret = ' RETURNING %s*' % ('oid, ' if 'oid' in attnames else '')
    825718        else:
    826719            ret = ''
    827         q = 'INSERT INTO %s (%s) VALUES (%s)%s' % (qcl, names, values, ret)
     720        q = 'INSERT INTO %s (%s) VALUES (%s)%s' % (
     721            _quote_class_name(cl), names, values, ret)
    828722        self._do_debug(q)
    829723        res = self.db.query(q)
     
    839733            d[qoid] = res
    840734            if selectable:
    841                 self.get(qcl, d, 'oid')
     735                self.get(cl, d, 'oid')
    842736        elif selectable:
    843737            if qoid in d:
    844                 self.get(qcl, d, 'oid')
     738                self.get(cl, d, 'oid')
    845739            else:
    846740                try:
    847                     self.get(qcl, d)
     741                    self.get(cl, d)
    848742                except ProgrammingError:
    849743                    pass  # table has no primary key
     
    863757        # otherwise use the primary key.  Fail if neither.
    864758        # Note that we only accept oid key from named args for safety
    865         qcl = self._add_schema(cl)
    866         qoid = _oid_key(qcl)
     759        qoid = _oid_key(cl)
    867760        if 'oid' in kw:
    868761            kw[qoid] = kw['oid']
     
    871764            d = {}
    872765        d.update(kw)
    873         attnames = self.get_attnames(qcl)
     766        attnames = self.get_attnames(cl)
    874767        if qoid in d:
    875768            where = 'oid = %s' % d[qoid]
     
    877770        else:
    878771            try:
    879                 keyname = self.pkey(qcl)
     772                keyname = self.pkey(cl)
    880773            except KeyError:
    881                 raise _prg_error('Class %s has no primary key' % qcl)
     774                raise _prg_error('Class %s has no primary key' % cl)
    882775            if isinstance(keyname, basestring):
    883776                keyname = (keyname,)
     
    894787            return d
    895788        values = ', '.join(values)
    896         selectable = self.has_table_privilege(qcl)
     789        selectable = self.has_table_privilege(cl)
    897790        if selectable:
    898791            ret = ' RETURNING %s*' % ('oid, ' if 'oid' in attnames else '')
    899792        else:
    900793            ret = ''
    901         q = 'UPDATE %s SET %s WHERE %s%s' % (qcl, values, where, ret)
     794        q = 'UPDATE %s SET %s WHERE %s%s' % (
     795            _quote_class_name(cl), values, where, ret)
    902796        self._do_debug(q)
    903797        res = self.db.query(q)
     
    913807            if selectable:
    914808                if qoid in d:
    915                     self.get(qcl, d, 'oid')
     809                    self.get(cl, d, 'oid')
    916810                else:
    917                     self.get(qcl, d)
     811                    self.get(cl, d)
    918812        return d
    919813
     
    928822        """
    929823        # At some point we will need a way to get defaults from a table.
    930         qcl = self._add_schema(cl)
    931824        if a is None:
    932825            a = {}  # empty if argument is not present
    933         attnames = self.get_attnames(qcl)
     826        attnames = self.get_attnames(cl)
    934827        for n, t in attnames.items():
    935828            if n == 'oid':
     
    958851        # isn't referenced somewhere (or else PostgreSQL will).
    959852        # Note that we only accept oid key from named args for safety
    960         qcl = self._add_schema(cl)
    961         qoid = _oid_key(qcl)
     853        qoid = _oid_key(cl)
    962854        if 'oid' in kw:
    963855            kw[qoid] = kw['oid']
     
    970862        else:
    971863            try:
    972                 keyname = self.pkey(qcl)
     864                keyname = self.pkey(cl)
    973865            except KeyError:
    974                 raise _prg_error('Class %s has no primary key' % qcl)
     866                raise _prg_error('Class %s has no primary key' % cl)
    975867            if isinstance(keyname, basestring):
    976868                keyname = (keyname,)
    977             attnames = self.get_attnames(qcl)
     869            attnames = self.get_attnames(cl)
    978870            try:
    979871                where = ' AND '.join(['%s = %s'
     
    981873            except KeyError:
    982874                raise _prg_error('Delete needs primary key or oid.')
    983         q = 'DELETE FROM %s WHERE %s' % (qcl, where)
     875        q = 'DELETE FROM %s WHERE %s' % (_quote_class_name(cl), where)
    984876        self._do_debug(q)
    985877        return int(self.db.query(q))
  • trunk/tests/test_classic.py

    r648 r729  
    7676        try:
    7777            db.query("CREATE VIEW _test_vschema AS "
    78                 "SELECT _test, 'abc'::text AS _test2  FROM _test_schema")
     78                "SELECT _test, 'abc'::text AS _test2 FROM _test_schema")
    7979        except Error:
    8080            pass
     
    111111        self.assertEqual(db.pkey('public._test_schema'), '_test')
    112112        self.assertEqual(db.pkey('_test1._test_schema'), '_test1')
    113 
    114         self.assertEqual(db.pkey('_test_schema',
    115                 {'test1': 'a', 'test2.test3': 'b'}),
    116                 {'public.test1': 'a', 'test2.test3': 'b'})
    117         self.assertEqual(db.pkey('test1'), 'a')
    118         self.assertEqual(db.pkey('public.test1'), 'a')
     113        self.assertEqual(db.pkey('_test2._test_schema'), '_test2')
     114        self.assertRaises(KeyError, db.pkey, '_test_vschema')
    119115
    120116    def test_get(self):
  • trunk/tests/test_classic_dbwrapper.py

    r725 r729  
    595595            self.assertEqual(r, frozenset([
    596596                'a_very_long_column_name', 'with space', '42']))
    597             self.assertEqual(pkey('%s0' % t, 'none'), 'none')
    598             self.assertEqual(pkey('%s0' % t), 'none')
    599             pkey(None, {'%s0' % t: 'a', 'public."%s1"' % t: 'b'})
     597            # a newly added primary key will be detected
     598            query('alter table "%s0" add primary key (a)' % t)
    600599            self.assertEqual(pkey('%s0' % t), 'a')
     600            # a changed primary key will not be detected,
     601            # indicating that the internal cache is operating
     602            query('alter table "%s1" rename column b to x' % t)
    601603            self.assertEqual(pkey('%s1' % t), 'b')
    602             pkey(None, {})
    603             self.assertRaises(KeyError, pkey, '%s0' % t)
     604            # we get the changed primary key when the cache is flushed
     605            self.assertEqual(pkey('%s1' % t, flush=True), 'x')
    604606            for n in range(7):
    605607                query('drop table "%s%d"' % (t, n))
     
    717719            self.assertRaises(pg.ProgrammingError, get, table, 2)
    718720            r = get(table, 2, 'n')
    719             oid_table = table
    720             if ' ' in table:
    721                 oid_table = '"%s"' % oid_table
    722             oid_table = 'oid(public.%s)' % oid_table
     721            oid_table = 'oid(%s)' % table
    723722            self.assertIn(oid_table, r)
    724723            oid = r[oid_table]
     
    798797                " v4 varchar(4), c4 char(4), t text,"
    799798                " b boolean, ts timestamp) with oids" % table)
    800             oid_table = table
    801             if ' ' in table:
    802                 oid_table = '"%s"' % oid_table
    803             oid_table = 'oid(public.%s)' % oid_table
     799            oid_table = 'oid(%s)' % table
    804800            tests = [dict(i2=None, i4=None, i8=None),
    805801                (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
     
    13241320        self.assertEqual(get("s4.t4", 1, 'n')['d'], 4)
    13251321
    1326     def testMangling(self):
     1322    def testMunging(self):
    13271323        get = self.db.get
    13281324        query = self.db.query
    13291325        r = get("t", 1, 'n')
    1330         self.assertIn('oid(public.t)', r)
     1326        self.assertIn('oid(t)', r)
    13311327        query("set search_path to s2")
    13321328        r = get("t2", 1, 'n')
    1333         self.assertIn('oid(s2.t2)', r)
     1329        self.assertIn('oid(t2)', r)
    13341330        query("set search_path to s3")
    13351331        r = get("t", 1, 'n')
    1336         self.assertIn('oid(s3.t)', r)
     1332        self.assertIn('oid(t)', r)
    13371333
    13381334
  • trunk/tests/test_classic_functions.py

    r680 r729  
    3131except NameError:  # Python >= 3.0
    3232    unicode = str
    33 
    34 
    35 class TestAuxiliaryFunctions(unittest.TestCase):
    36     """Test the auxiliary functions external to the connection class."""
    37 
    38     def testIsQuoted(self):
    39         f = pg._is_quoted
    40         self.assertTrue(f('A'))
    41         self.assertTrue(f('0'))
    42         self.assertTrue(f('#'))
    43         self.assertTrue(f('*'))
    44         self.assertTrue(f('.'))
    45         self.assertTrue(f(' '))
    46         self.assertTrue(f('a b'))
    47         self.assertTrue(f('a+b'))
    48         self.assertTrue(f('a*b'))
    49         self.assertTrue(f('a.b'))
    50         self.assertTrue(f('0ab'))
    51         self.assertTrue(f('aBc'))
    52         self.assertTrue(f('ABC'))
    53         self.assertTrue(f('"a"'))
    54         self.assertTrue(not f('a'))
    55         self.assertTrue(not f('a0'))
    56         self.assertTrue(not f('_'))
    57         self.assertTrue(not f('_a'))
    58         self.assertTrue(not f('_0'))
    59         self.assertTrue(not f('_a_0_'))
    60         self.assertTrue(not f('ab'))
    61         self.assertTrue(not f('ab0'))
    62         self.assertTrue(not f('abc'))
    63         self.assertTrue(not f('abc'))
    64         if 'À'.isalpha():
    65             self.assertTrue(not f('À'))
    66             self.assertTrue(f('Ä'))
    67             self.assertTrue(not f('kÀse'))
    68             self.assertTrue(f('KÀse'))
    69             self.assertTrue(not f('emmentaler_kÀse'))
    70             self.assertTrue(f('emmentaler kÀse'))
    71             self.assertTrue(f('EmmentalerKÀse'))
    72             self.assertTrue(f('Emmentaler KÀse'))
    73 
    74     def testIsUnquoted(self):
    75         f = pg._is_unquoted
    76         self.assertTrue(f('A'))
    77         self.assertTrue(not f('0'))
    78         self.assertTrue(not f('#'))
    79         self.assertTrue(not f('*'))
    80         self.assertTrue(not f('.'))
    81         self.assertTrue(not f(' '))
    82         self.assertTrue(not f('a b'))
    83         self.assertTrue(not f('a+b'))
    84         self.assertTrue(not f('a*b'))
    85         self.assertTrue(not f('a.b'))
    86         self.assertTrue(not f('0ab'))
    87         self.assertTrue(f('aBc'))
    88         self.assertTrue(f('ABC'))
    89         self.assertTrue(not f('"a"'))
    90         self.assertTrue(f('a0'))
    91         self.assertTrue(f('_'))
    92         self.assertTrue(f('_a'))
    93         self.assertTrue(f('_0'))
    94         self.assertTrue(f('_a_0_'))
    95         self.assertTrue(f('ab'))
    96         self.assertTrue(f('ab0'))
    97         self.assertTrue(f('abc'))
    98         if 'À'.isalpha():
    99             self.assertTrue(f('À'))
    100             self.assertTrue(f('Ä'))
    101             self.assertTrue(f('kÀse'))
    102             self.assertTrue(f('KÀse'))
    103             self.assertTrue(f('emmentaler_kÀse'))
    104             self.assertTrue(not f('emmentaler kÀse'))
    105             self.assertTrue(f('EmmentalerKÀse'))
    106             self.assertTrue(not f('Emmentaler KÀse'))
    107 
    108     def testSplitFirstPart(self):
    109         f = pg._split_first_part
    110         self.assertEqual(f('a.b'), ['a', 'b'])
    111         self.assertEqual(f('a.b.c'), ['a', 'b.c'])
    112         self.assertEqual(f('"a.b".c'), ['a.b', 'c'])
    113         self.assertEqual(f('a."b.c"'), ['a', '"b.c"'])
    114         self.assertEqual(f('A.b.c'), ['a', 'b.c'])
    115         self.assertEqual(f('Ab.c'), ['ab', 'c'])
    116         self.assertEqual(f('aB.c'), ['ab', 'c'])
    117         self.assertEqual(f('AB.c'), ['ab', 'c'])
    118         self.assertEqual(f('A b.c'), ['A b', 'c'])
    119         self.assertEqual(f('a B.c'), ['a B', 'c'])
    120         self.assertEqual(f('"A".b.c'), ['A', 'b.c'])
    121         self.assertEqual(f('"A""B".c'), ['A"B', 'c'])
    122         self.assertEqual(f('a.b.c.d.e.f.g'), ['a', 'b.c.d.e.f.g'])
    123         self.assertEqual(f('"a.b.c.d.e.f".g'), ['a.b.c.d.e.f', 'g'])
    124         self.assertEqual(f('a.B.c.D.e.F.g'), ['a', 'B.c.D.e.F.g'])
    125         self.assertEqual(f('A.b.C.d.E.f.G'), ['a', 'b.C.d.E.f.G'])
    126 
    127     def testSplitParts(self):
    128         f = pg._split_parts
    129         self.assertEqual(f('a.b'), ['a', 'b'])
    130         self.assertEqual(f('a.b.c'), ['a', 'b', 'c'])
    131         self.assertEqual(f('"a.b".c'), ['a.b', 'c'])
    132         self.assertEqual(f('a."b.c"'), ['a', 'b.c'])
    133         self.assertEqual(f('A.b.c'), ['a', 'b', 'c'])
    134         self.assertEqual(f('Ab.c'), ['ab', 'c'])
    135         self.assertEqual(f('aB.c'), ['ab', 'c'])
    136         self.assertEqual(f('AB.c'), ['ab', 'c'])
    137         self.assertEqual(f('A b.c'), ['A b', 'c'])
    138         self.assertEqual(f('a B.c'), ['a B', 'c'])
    139         self.assertEqual(f('"A".b.c'), ['A', 'b', 'c'])
    140         self.assertEqual(f('"A""B".c'), ['A"B', 'c'])
    141         self.assertEqual(f('a.b.c.d.e.f.g'),
    142             ['a', 'b', 'c', 'd', 'e', 'f', 'g'])
    143         self.assertEqual(f('"a.b.c.d.e.f".g'),
    144             ['a.b.c.d.e.f', 'g'])
    145         self.assertEqual(f('a.B.c.D.e.F.g'),
    146             ['a', 'b', 'c', 'd', 'e', 'f', 'g'])
    147         self.assertEqual(f('A.b.C.d.E.f.G'),
    148             ['a', 'b', 'c', 'd', 'e', 'f', 'g'])
    149 
    150     def testJoinParts(self):
    151         f = pg._join_parts
    152         self.assertEqual(f(('a',)), 'a')
    153         self.assertEqual(f(('a', 'b')), 'a.b')
    154         self.assertEqual(f(('a', 'b', 'c')), 'a.b.c')
    155         self.assertEqual(f(('a', 'b', 'c', 'd', 'e', 'f', 'g')),
    156             'a.b.c.d.e.f.g')
    157         self.assertEqual(f(('A', 'b')), '"A".b')
    158         self.assertEqual(f(('a', 'B')), 'a."B"')
    159         self.assertEqual(f(('a b', 'c')), '"a b".c')
    160         self.assertEqual(f(('a', 'b c')), 'a."b c"')
    161         self.assertEqual(f(('a_b', 'c')), 'a_b.c')
    162         self.assertEqual(f(('a', 'b_c')), 'a.b_c')
    163         self.assertEqual(f(('0', 'a')), '"0".a')
    164         self.assertEqual(f(('0_', 'a')), '"0_".a')
    165         self.assertEqual(f(('_0', 'a')), '_0.a')
    166         self.assertEqual(f(('_a', 'b')), '_a.b')
    167         self.assertEqual(f(('a', 'B', '0', 'c0', 'C0',
    168             'd e', 'f_g', 'h.i', 'jklm', 'nopq')),
    169             'a."B"."0".c0."C0"."d e".f_g."h.i".jklm.nopq')
    170 
    171     def testOidKey(self):
    172         f = pg._oid_key
    173         self.assertEqual(f('a'), 'oid(a)')
    174         self.assertEqual(f('a.b'), 'oid(a.b)')
    17533
    17634
Note: See TracChangeset for help on using the changeset viewer.