Changeset 765 for trunk


Ignore:
Timestamp:
Jan 18, 2016, 6:21:44 PM (4 years ago)
Author:
cito
Message:

Improve support for access by primary key

Composite primary keys are now returned as tuples instead of frozensets,
where the ordering of the tuple reflects the primary key index.

Primary keys now takes precedence if both OID and primary key are available
(this was solved the other way around in 4.x). Use of OIDs is thus slightly
more discouraged, though it still works as before for tables with OIDs where
no primary key is available.

This changeset also clarifies some docstrings, makes the code a bit clearer,
handles and tests some more edge cases (pg module still has 100% coverage).

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/changelog.rst

    r762 r765  
    2828- The tty parameter and attribute of database connections has been
    2929  removed since it is not supported any more since PostgreSQL 7.4.
     30- The pkey() method of the classic interface now returns tuples instead
     31  of frozenset. The order of the tuples is like in the primary key index.
    3032- The table name that is affixed to the name of the OID column returned
    3133  by the get() method of the classic interface will not automatically
     
    3335  but it means you must always write the table name in the same way when
    3436  you call the methods using it and you are using tables with OIDs.
     37  Also, OIDs are now only used when access via primary key is not possible.
    3538  Note that OIDs are considered deprecated anyway, and they are not created
    3639  by default any more in PostgreSQL 8.1 and later.
  • trunk/docs/contents/pg/db_wrapper.rst

    r763 r765  
    6666    :raises KeyError: the table does not have a primary key
    6767
    68 This method returns the primary key of a table. For composite primary
    69 keys, the return value will be a frozenset. Note that this raises a
    70 KeyError if the table does not have a primary key.
     68This method returns the primary key of a table.  Single primary keys are
     69returned as strings unless you set the composite flag.  Composite primary
     70keys are always represented as tuples.  Note that this raises a KeyError
     71if the table does not have a primary key.
    7172
    7273get_databases -- get list of databases in the system
     
    296297    :returns: A dictionary - the keys are the attribute names,
    297298      the values are the row values.
    298     :raises ProgrammingError: no primary key or missing privilege
    299 
    300 This method is the basic mechanism to get a single row. It assumes
    301 that the key specifies a unique row. If *keyname* is not specified,
    302 then the primary key for the table is used. If *row* is a dictionary
    303 then the value for the key is taken from it and it is modified to
    304 include the new values, replacing existing values where necessary.
    305 For a composite key, *keyname* can also be a sequence of key names.
    306 The OID is also put into the dictionary if the table has one, but in
    307 order to allow the caller to work with multiple tables, it is munged
    308 as ``oid(table)``.
     299    :raises ProgrammingError: table has no primary key or missing privilege
     300    :raises KeyError: missing key value for the row
     301
     302This method is the basic mechanism to get a single row.  It assumes
     303that the *keyname* specifies a unique row.  It must be the name of a
     304single column or a tuple of column names.  If *keyname* is not specified,
     305then the primary key for the table is used.
     306
     307If *row* is a dictionary, then the value for the key is taken from it.
     308Otherwise, the row must be a single value or a tuple of values
     309corresponding to the passed *keyname* or primary key.  The fetched row
     310from the table will be returned as a new dictionary or used to replace
     311the existing values when row was passed as aa dictionary.
     312
     313The OID is also put into the dictionary if the table has one, but
     314in order to allow the caller to work with multiple tables, it is
     315munged as ``oid(table)`` using the actual name of the table.
    309316
    310317insert -- insert a row into a database table
     
    345352    :returns: the new row in the database
    346353    :rtype: dict
    347     :raises ProgrammingError: no primary key or missing privilege
    348 
    349 Similar to insert but updates an existing row.  The update is based on the
    350 OID value as munged by get or passed as keyword, or on the primary key of
    351 the table.  The dictionary is modified to reflect any changes caused by the
     354    :raises ProgrammingError: table has no primary key or missing privilege
     355    :raises KeyError: missing key value for the row
     356
     357Similar to insert but updates an existing row.  The update is based on
     358the primary key of the table or the OID value as munged by :meth:`DB.get`
     359or passed as keyword.
     360
     361The dictionary is then modified to reflect any changes caused by the
    352362update due to triggers, rules, default values, etc.
    353363
     
    355365on the fields in the keywords.  There must be an OID or primary key
    356366either in the dictionary where the OID must be munged, or in the keywords
    357 where it can be simply the string 'oid'.
     367where it can be simply the string ``'oid'``.
    358368
    359369upsert -- insert a row with conflict resolution
     
    369379    :returns: the new row in the database
    370380    :rtype: dict
    371     :raises ProgrammingError: no primary key or missing privilege
     381    :raises ProgrammingError: table has no primary key or missing privilege
    372382
    373383This method inserts a row into a table, but instead of raising a
     
    475485    :param col: optional keyword arguments for updating the dictionary
    476486    :rtype: None
    477 
    478 This method deletes the row from a table.  It deletes based on the OID value
    479 as munged by get or passed as keyword, or on the primary key of the table.
     487    :raises ProgrammingError: table has no primary key,
     488        row is still referenced or missing privilege
     489    :raises KeyError: missing key value for the row
     490
     491This method deletes the row from a table.  It deletes based on the
     492primary key of the table or the OID value as munged by :meth:`DB.get`
     493or passed as keyword.
     494
    480495The return value is the number of deleted rows (i.e. 0 if the row did not
    481496exist and 1 if the row was deleted).
     497
     498Note that if the row cannot be deleted because e.g. it is still referenced
     499by another table, this method will raise a ProgrammingError.
    482500
    483501truncate -- Quickly empty database tables
  • trunk/pg.py

    r763 r765  
    355355            raise ValueError
    356356        return d
     357
     358    _num_types = frozenset('int float num money'
     359        ' int2 int4 int8 float4 float8 numeric money'.split())
    357360
    358361    def _prepare_num(self, d):
     
    608611        """Execute a SQL command string.
    609612
    610         This method simply sends a SQL query to the database. If the query is
     613        This method simply sends a SQL query to the database.  If the query is
    611614        an insert statement that inserted exactly one row into a table that
    612615        has OIDs, the return value is the OID of the newly inserted row.
    613616        If the query is an update or delete statement, or an insert statement
    614617        that did not insert exactly one row in a table with OIDs, then the
    615         number of rows affected is returned as a string. If it is a statement
     618        number of rows affected is returned as a string.  If it is a statement
    616619        that returns rows as a result (usually a select statement, but maybe
    617620        also an "insert/update ... returning" statement), this method returns
    618621        a Query object that can be accessed via getresult() or dictresult()
    619         or simply printed. Otherwise, it returns `None`.
     622        or simply printed.  Otherwise, it returns `None`.
    620623
    621624        The query can contain numbered parameters of the form $1 in place
    622         of any data constant. Arguments given after the query string will
    623         be substituted for the corresponding numbered parameter. Parameter
     625        of any data constant.  Arguments given after the query string will
     626        be substituted for the corresponding numbered parameter.  Parameter
    624627        values can also be given as a single list or tuple argument.
    625628        """
     
    630633        return self.db.query(qstr, args)
    631634
    632     def pkey(self, table, flush=False):
     635    def pkey(self, table, composite=False, flush=False):
    633636        """Get or set the primary key of a table.
    634637
    635         Composite primary keys are represented as frozensets. Note that
    636         this raises a KeyError if the table does not have a primary key.
     638        Single primary keys are returned as strings unless you
     639        set the composite flag.  Composite primary keys are always
     640        represented as tuples.  Note that this raises a KeyError
     641        if the table does not have a primary key.
    637642
    638643        If flush is set then the internal cache for primary keys will
    639         be flushed. This may be necessary after the database schema or
     644        be flushed.  This may be necessary after the database schema or
    640645        the search path has been changed.
    641646        """
     
    647652            pkey = pkeys[table]
    648653        except KeyError:  # cache miss, check the database
    649             q = ("SELECT a.attname FROM pg_index i"
     654            q = ("SELECT a.attname, a.attnum, i.indkey FROM pg_index i"
    650655                " JOIN pg_attribute a ON a.attrelid = i.indrelid"
    651656                " AND a.attnum = ANY(i.indkey)"
    652657                " AND NOT a.attisdropped"
    653658                " WHERE i.indrelid=%s::regclass"
    654                 " AND i.indisprimary") % (
     659                " AND i.indisprimary ORDER BY a.attnum") % (
    655660                    self._prepare_qualified_param(table, 1),)
    656661            pkey = self.db.query(q, (table,)).getresult()
    657662            if not pkey:
    658663                raise KeyError('Table %s has no primary key' % table)
     664            # we want to use the order defined in the primary key index here,
     665            # not the order as defined by the columns in the table
    659666            if len(pkey) > 1:
    660                 pkey = frozenset(k[0] for k in pkey)
     667                indkey = [int(k) for k in pkey[0][2].split()]
     668                pkey = sorted(pkey, key=lambda row: indkey.index(row[1]))
     669                pkey = tuple(row[0] for row in pkey)
    661670            else:
    662671                pkey = pkey[0][0]
    663672            pkeys[table] = pkey  # cache it
     673        if composite and not isinstance(pkey, tuple):
     674            pkey = (pkey,)
    664675        return pkey
    665676
     
    755766        """Get a row from a database table or view.
    756767
    757         This method is the basic mechanism to get a single row.  The keyname
    758         that the key specifies a unique row.  If keyname is not specified
    759         then the primary key for the table is used.  If row is a dictionary
    760         then the value for the key is taken from it and it is modified to
    761         include the new values, replacing existing values where necessary.
    762         For a composite key, keyname can also be a sequence of key names.
     768        This method is the basic mechanism to get a single row.  It assumes
     769        that the keyname specifies a unique row.  It must be the name of a
     770        single column or a tuple of column names.  If the keyname is not
     771        specified, then the primary key for the table is used.
     772
     773        If row is a dictionary, then the value for the key is taken from it.
     774        Otherwise, the row must be a single value or a tuple of values
     775        corresponding to the passed keyname or primary key.  The fetched row
     776        from the table will be returned as a new dictionary or used to replace
     777        the existing values when row was passed as aa dictionary.
     778
    763779        The OID is also put into the dictionary if the table has one, but
    764780        in order to allow the caller to work with multiple tables, it is
    765         munged as "oid(table)".
    766         """
    767         if table.endswith('*'):  # scan descendant tables?
    768             table = table[:-1].rstrip()  # need parent table name
     781        munged as "oid(table)" using the actual name of the table.
     782        """
     783        if table.endswith('*'):  # hint for descendant tables can be ignored
     784            table = table[:-1].rstrip()
     785        attnames = self.get_attnames(table)
     786        qoid = _oid_key(table) if 'oid' in attnames else None
     787        if keyname and isinstance(keyname, basestring):
     788            keyname = (keyname,)
     789        if qoid and isinstance(row, dict) and qoid in row and 'oid' not in row:
     790            row['oid'] = row[qoid]
    769791        if not keyname:
    770             # use the primary key by default
    771             try:
    772                 keyname = self.pkey(table)
    773             except KeyError:
    774                 raise _prg_error('Table %s has no primary key' % table)
    775         attnames = self.get_attnames(table)
     792            try:  # if keyname is not specified, try using the primary key
     793                keyname = self.pkey(table, True)
     794            except KeyError:  # the table has no primary key
     795                # try using the oid instead
     796                if qoid and isinstance(row, dict) and 'oid' in row:
     797                    keyname = ('oid',)
     798                else:
     799                    raise _prg_error('Table %s has no primary key' % table)
     800            else:  # the table has a primary key
     801                # check whether all key columns have values
     802                if isinstance(row, dict) and not set(keyname).issubset(row):
     803                    # try using the oid instead
     804                    if qoid and 'oid' in row:
     805                        keyname = ('oid',)
     806                    else:
     807                        raise KeyError(
     808                            'Missing value in row for specified keyname')
     809        if not isinstance(row, dict):
     810            if not isinstance(row, (tuple, list)):
     811                row = [row]
     812            if len(keyname) != len(row):
     813                raise KeyError(
     814                    'Differing number of items in keyname and row')
     815            row = dict(zip(keyname, row))
    776816        params = []
    777817        param = partial(self._prepare_param, params=params)
    778818        col = self.escape_identifier
    779         # We want the oid for later updates if that isn't the key.
    780         # To allow users to work with multiple tables, we munge
    781         # the name of the "oid" key by adding the name of the table.
    782         qoid = _oid_key(table)
    783         if keyname == 'oid':
    784             if isinstance(row, dict):
    785                 if qoid not in row:
    786                     raise _prg_error('%s not in row' % qoid)
    787             else:
    788                 row = {qoid: row}
    789             what = '*'
    790             where = 'oid = %s' % param(row[qoid], 'int')
    791         else:
    792             keyname = [keyname] if isinstance(
    793                 keyname, basestring) else sorted(keyname)
    794             if not isinstance(row, dict):
    795                 if len(keyname) > 1:
    796                     raise _prg_error('Composite key needs dict as row')
    797                 row = dict((k, row) for k in keyname)
    798             what = ', '.join(col(k) for k in attnames)
    799             where = ' AND '.join('%s = %s' % (
    800                 col(k), param(row[k], attnames[k])) for k in keyname)
     819        what = 'oid, *' if qoid else '*'
     820        where = ' AND '.join('%s = %s' % (
     821            col(k), param(row[k], attnames[k])) for k in keyname)
     822        if 'oid' in row:
     823            if qoid:
     824                row[qoid] = row['oid']
     825            del row['oid']
    801826        q = 'SELECT %s FROM %s WHERE %s LIMIT 1' % (
    802827            what, self._escape_qualified_name(table), where)
     
    808833                table, where, self._list_params(params)))
    809834        for n, value in res[0].items():
    810             if n == 'oid':
     835            if qoid and n == 'oid':
    811836                n = qoid
    812             elif attnames.get(n) == 'bytea':
     837            elif value is not None and attnames.get(n) == 'bytea':
    813838                value = self.unescape_bytea(value)
    814839            row[n] = value
     
    831856        although PostgreSQL does.
    832857        """
    833         if 'oid' in kw:
    834             del kw['oid']
     858        if table.endswith('*'):  # hint for descendant tables can be ignored
     859            table = table[:-1].rstrip()
    835860        if row is None:
    836861            row = {}
    837862        row.update(kw)
     863        if 'oid' in row:
     864            del row['oid']  # do not insert oid
    838865        attnames = self.get_attnames(table)
     866        qoid = _oid_key(table) if 'oid' in attnames else None
    839867        params = []
    840868        param = partial(self._prepare_param, params=params)
     
    846874                values.append(param(row[n], attnames[n]))
    847875        names, values = ', '.join(names), ', '.join(values)
    848         ret = 'oid, *' if 'oid' in attnames else '*'
     876        ret = 'oid, *' if qoid else '*'
    849877        q = 'INSERT INTO %s (%s) VALUES (%s) RETURNING %s' % (
    850878            self._escape_qualified_name(table), names, values, ret)
    851879        self._do_debug(q, params)
    852880        q = self.db.query(q, params)
    853         res = q.dictresult()  # this will always return a row
    854         for n, value in res[0].items():
    855             if n == 'oid':
    856                 n = _oid_key(table)
    857             elif attnames.get(n) == 'bytea' and value is not None:
    858                 value = self.unescape_bytea(value)
    859             row[n] = value
     881        res = q.dictresult()
     882        if res:  # this should always be true
     883            for n, value in res[0].items():
     884                if qoid and n == 'oid':
     885                    n = qoid
     886                elif value is not None and attnames.get(n) == 'bytea':
     887                    value = self.unescape_bytea(value)
     888                row[n] = value
    860889        return row
    861890
     
    864893
    865894        Similar to insert but updates an existing row.  The update is based
    866         on the OID value as munged by get or passed as keyword, or on the
    867         primary key of the table.  The dictionary is modified to reflect
    868         any changes caused by the update due to triggers, rules, default
    869         values, etc.
    870         """
    871         # Update always works on the oid which get() returns if available,
    872         # otherwise use the primary key.  Fail if neither.
    873         # Note that we only accept oid key from named args for safety.
    874         qoid = _oid_key(table)
    875         if 'oid' in kw:
    876             kw[qoid] = kw.pop('oid')
     895        on the primary key of the table or the OID value as munged by get
     896        or passed as keyword.
     897
     898        The dictionary is then modified to reflect any changes caused by the
     899        update due to triggers, rules, default values, etc.
     900        """
     901        if table.endswith('*'):
     902            table = table[:-1].rstrip()  # need parent table name
     903        attnames = self.get_attnames(table)
     904        qoid = _oid_key(table) if 'oid' in attnames else None
    877905        if row is None:
    878906            row = {}
     907        elif 'oid' in row:
     908            del row['oid']  # only accept oid key from named args for safety
    879909        row.update(kw)
    880         attnames = self.get_attnames(table)
     910        if qoid and qoid in row and 'oid' not in row:
     911            row['oid'] = row[qoid]
     912        try:  # try using the primary key
     913            keyname = self.pkey(table, True)
     914        except KeyError:  # the table has no primary key
     915            # try using the oid instead
     916            if qoid and 'oid' in row:
     917                keyname = ('oid',)
     918            else:
     919                raise _prg_error('Table %s has no primary key' % table)
     920        else:  # the table has a primary key
     921            # check whether all key columns have values
     922            if not set(keyname).issubset(row):
     923                # try using the oid instead
     924                if qoid and 'oid' in row:
     925                    keyname = ('oid',)
     926                else:
     927                    raise KeyError('Missing primary key in row')
    881928        params = []
    882929        param = partial(self._prepare_param, params=params)
    883930        col = self.escape_identifier
    884         if qoid in row:
    885             where = 'oid = %s' % param(row[qoid], 'int')
    886             keyname = []
    887         else:
    888             try:
    889                 keyname = self.pkey(table)
    890             except KeyError:
    891                 raise _prg_error('Table %s has no primary key' % table)
    892             keyname = [keyname] if isinstance(
    893                 keyname, basestring) else sorted(keyname)
    894             try:
    895                 where = ' AND '.join('%s = %s' % (
    896                     col(k), param(row[k], attnames[k])) for k in keyname)
    897             except KeyError:
    898                 raise _prg_error('Update operation needs primary key or oid')
     931        where = ' AND '.join('%s = %s' % (
     932            col(k), param(row[k], attnames[k])) for k in keyname)
     933        if 'oid' in row:
     934            if qoid:
     935                row[qoid] = row['oid']
     936            del row['oid']
     937        values = []
    899938        keyname = set(keyname)
    900         keyname.add('oid')
    901         values = []
    902939        for n in attnames:
    903940            if n in row and n not in keyname:
     
    906943            return row
    907944        values = ', '.join(values)
    908         ret = 'oid, *' if 'oid' in attnames else '*'
     945        ret = 'oid, *' if qoid else '*'
    909946        q = 'UPDATE %s SET %s WHERE %s RETURNING %s' % (
    910947            self._escape_qualified_name(table), values, where, ret)
     
    914951        if res:  # may be empty when row does not exist
    915952            for n, value in res[0].items():
    916                 if n == 'oid':
     953                if qoid and n == 'oid':
    917954                    n = qoid
    918                 elif attnames.get(n) == 'bytea' and value is not None:
     955                elif value is not None and attnames.get(n) == 'bytea':
    919956                    value = self.unescape_bytea(value)
    920957                row[n] = value
     
    9641001        only available since PostgreSQL 9.5.
    9651002        """
    966         if 'oid' in kw:
    967             del kw['oid']
     1003        if table.endswith('*'):  # hint for descendant tables can be ignored
     1004            table = table[:-1].rstrip()
    9681005        if row is None:
    9691006            row = {}
     1007        if 'oid' in row:
     1008            del row['oid']  # do not insert oid
     1009        if 'oid' in kw:
     1010            del kw['oid']  # do not update oid
    9701011        attnames = self.get_attnames(table)
     1012        qoid = _oid_key(table) if 'oid' in attnames else None
    9711013        params = []
    9721014        param = partial(self._prepare_param,params=params)
     
    9791021        names, values = ', '.join(names), ', '.join(values)
    9801022        try:
    981             keyname = self.pkey(table)
     1023            keyname = self.pkey(table, True)
    9821024        except KeyError:
    9831025            raise _prg_error('Table %s has no primary key' % table)
    984         keyname = [keyname] if isinstance(
    985             keyname, basestring) else sorted(keyname)
    9861026        target = ', '.join(col(k) for k in keyname)
    9871027        update = []
     
    9981038            return row
    9991039        do = 'update set %s' % ', '.join(update) if update else 'nothing'
    1000         ret = 'oid, *' if 'oid' in attnames else '*'
     1040        ret = 'oid, *' if qoid else '*'
    10011041        q = ('INSERT INTO %s AS included (%s) VALUES (%s)'
    10021042            ' ON CONFLICT (%s) DO %s RETURNING %s') % (
     
    10121052            raise  # re-raise original error
    10131053        res = q.dictresult()
    1014         if update:  # may be empty with "do nothing"
     1054        if res:  # may be empty with "do nothing"
    10151055            for n, value in res[0].items():
    1016                 if n == 'oid':
    1017                     n = _oid_key(table)
    1018                 elif attnames.get(n) == 'bytea' and value is not None:
     1056                if qoid and n == 'oid':
     1057                    n = qoid
     1058                elif value is not None and attnames.get(n) == 'bytea':
    10191059                    value = self.unescape_bytea(value)
    10201060                row[n] = value
     
    10381078            if n == 'oid':
    10391079                continue
    1040             if t in ('int', 'integer', 'smallint', 'bigint',
    1041                     'float', 'real', 'double precision',
    1042                     'num', 'numeric', 'money'):
     1080            if t in self._num_types:
    10431081                row[n] = 0
    1044             elif t in ('bool', 'boolean'):
     1082            elif t == 'bool':
    10451083                row[n] = self._make_bool(False)
    10461084            else:
     
    10521090
    10531091        This method deletes the row from a table.  It deletes based on the
    1054         OID value as munged by get or passed as keyword, or on the primary
    1055         key of the table.  The return value is the number of deleted rows
    1056         (i.e. 0 if the row did not exist and 1 if the row was deleted).
    1057         """
    1058         # Like update, delete works on the oid.
    1059         # One day we will be testing that the record to be deleted
    1060         # isn't referenced somewhere (or else PostgreSQL will).
    1061         # Note that we only accept oid key from named args for safety.
    1062         qoid = _oid_key(table)
    1063         if 'oid' in kw:
    1064             kw[qoid] = kw.pop('oid')
     1092        primary key of the table or the OID value as munged by get() or
     1093        passed as keyword.
     1094
     1095        The return value is the number of deleted rows (i.e. 0 if the row
     1096        did not exist and 1 if the row was deleted).
     1097
     1098        Note that if the row cannot be deleted because e.g. it is still
     1099        referenced by another table, this method raises a ProgrammingError.
     1100        """
     1101        if table.endswith('*'):  # hint for descendant tables can be ignored
     1102            table = table[:-1].rstrip()
     1103        attnames = self.get_attnames(table)
     1104        qoid = _oid_key(table) if 'oid' in attnames else None
    10651105        if row is None:
    10661106            row = {}
     1107        elif 'oid' in row:
     1108            del row['oid']  # only accept oid key from named args for safety
    10671109        row.update(kw)
     1110        if qoid and qoid in row and 'oid' not in row:
     1111            row['oid'] = row[qoid]
     1112        try:  # try using the primary key
     1113            keyname = self.pkey(table, True)
     1114        except KeyError:  # the table has no primary key
     1115            # try using the oid instead
     1116            if qoid and 'oid' in row:
     1117                keyname = ('oid',)
     1118            else:
     1119                raise _prg_error('Table %s has no primary key' % table)
     1120        else:  # the table has a primary key
     1121            # check whether all key columns have values
     1122            if not set(keyname).issubset(row):
     1123                # try using the oid instead
     1124                if qoid and 'oid' in row:
     1125                    keyname = ('oid',)
     1126                else:
     1127                    raise KeyError('Missing primary key in row')
    10681128        params = []
    10691129        param = partial(self._prepare_param, params=params)
    1070         if qoid in row:
    1071             where = 'oid = %s' % param(row[qoid], 'int')
    1072         else:
    1073             try:
    1074                 keyname = self.pkey(table)
    1075             except KeyError:
    1076                 raise _prg_error('Table %s has no primary key' % table)
    1077             keyname = [keyname] if isinstance(
    1078                 keyname, basestring) else sorted(keyname)
    1079             attnames = self.get_attnames(table)
    1080             col = self.escape_identifier
    1081             try:
    1082                 where = ' AND '.join('%s = %s' % (
    1083                     col(k), param(row[k], attnames[k])) for k in keyname)
    1084             except KeyError:
    1085                 raise _prg_error('Delete operation needs primary key or oid')
     1130        col = self.escape_identifier
     1131        where = ' AND '.join('%s = %s' % (
     1132            col(k), param(row[k], attnames[k])) for k in keyname)
     1133        if 'oid' in row:
     1134            if qoid:
     1135                row[qoid] = row['oid']
     1136            del row['oid']
    10861137        q = 'DELETE FROM %s WHERE %s' % (
    10871138            self._escape_qualified_name(table), where)
  • trunk/tests/test_classic_dbwrapper.py

    r763 r765  
    709709        query = self.db.query
    710710        pkey = self.db.pkey
     711        self.assertRaises(KeyError, pkey, 'test')
    711712        for t in ('pkeytest', 'primary key test'):
    712             for n in range(7):
     713            for n in range(8):
    713714                query('drop table if exists "%s%d"' % (t, n))
    714715                self.addCleanup(query, 'drop table "%s%d"' % (t, n))
     
    724725                " primary key (f, h))" % t)
    725726            query('create table "%s4" ('
     727                "e smallint, f smallint, g smallint,"
     728                " h smallint, i smallint,"
     729                " primary key (h, f))" % t)
     730            query('create table "%s5" ('
    726731                "more_than_one_letter varchar primary key)" % t)
    727             query('create table "%s5" ('
     732            query('create table "%s6" ('
    728733                '"with space" date primary key)' % t)
    729             query('create table "%s6" ('
     734            query('create table "%s7" ('
    730735                'a_very_long_column_name varchar,'
    731736                ' "with space" date,'
     
    735740            self.assertRaises(KeyError, pkey, '%s0' % t)
    736741            self.assertEqual(pkey('%s1' % t), 'b')
     742            self.assertEqual(pkey('%s1' % t, True), ('b',))
     743            self.assertEqual(pkey('%s1' % t, composite=False), 'b')
     744            self.assertEqual(pkey('%s1' % t, composite=True), ('b',))
    737745            self.assertEqual(pkey('%s2' % t), 'd')
     746            self.assertEqual(pkey('%s2' % t, composite=True), ('d',))
    738747            r = pkey('%s3' % t)
    739             self.assertIsInstance(r, frozenset)
    740             self.assertEqual(r, frozenset('fh'))
    741             self.assertEqual(pkey('%s4' % t), 'more_than_one_letter')
    742             self.assertEqual(pkey('%s5' % t), 'with space')
    743             r = pkey('%s6' % t)
    744             self.assertIsInstance(r, frozenset)
    745             self.assertEqual(r, frozenset([
    746                 'a_very_long_column_name', 'with space', '42']))
     748            self.assertIsInstance(r, tuple)
     749            self.assertEqual(r, ('f', 'h'))
     750            r = pkey('%s3' % t, composite=False)
     751            self.assertIsInstance(r, tuple)
     752            self.assertEqual(r, ('f', 'h'))
     753            r = pkey('%s4' % t)
     754            self.assertIsInstance(r, tuple)
     755            self.assertEqual(r, ('h', 'f'))
     756            self.assertEqual(pkey('%s5' % t), 'more_than_one_letter')
     757            self.assertEqual(pkey('%s6' % t), 'with space')
     758            r = pkey('%s7' % t)
     759            self.assertIsInstance(r, tuple)
     760            self.assertEqual(r, (
     761                'a_very_long_column_name', 'with space', '42'))
    747762            # a newly added primary key will be detected
    748763            query('alter table "%s0" add primary key (a)' % t)
     
    956971        query = self.db.query
    957972        table = 'get_test_table'
     973        self.assertRaises(TypeError, get)
     974        self.assertRaises(TypeError, get, table)
     975        query('drop table if exists "%s"' % table)
     976        self.addCleanup(query, 'drop table "%s"' % table)
     977        query('create table "%s" ('
     978            "n integer, t text) without oids" % table)
     979        for n, t in enumerate('xyz'):
     980            query('insert into "%s" values('"%d, '%s')"
     981                % (table, n + 1, t))
     982        self.assertRaises(pg.ProgrammingError, get, table, 2)
     983        r = get(table, 2, 'n')
     984        self.assertIsInstance(r, dict)
     985        self.assertEqual(r, dict(n=2, t='y'))
     986        r = get(table, 1, 'n')
     987        self.assertEqual(r, dict(n=1, t='x'))
     988        r = get(table, (3,), ('n',))
     989        self.assertEqual(r, dict(n=3, t='z'))
     990        r = get(table, 'y', 't')
     991        self.assertEqual(r, dict(n=2, t='y'))
     992        self.assertRaises(pg.DatabaseError, get, table, 4)
     993        self.assertRaises(pg.DatabaseError, get, table, 4, 'n')
     994        self.assertRaises(pg.DatabaseError, get, table, 'y')
     995        self.assertRaises(pg.DatabaseError, get, table, 2, 't')
     996        s = dict(n=3)
     997        self.assertRaises(pg.ProgrammingError, get, table, s)
     998        r = get(table, s, 'n')
     999        self.assertIs(r, s)
     1000        self.assertEqual(r, dict(n=3, t='z'))
     1001        s.update(t='x')
     1002        r = get(table, s, 't')
     1003        self.assertIs(r, s)
     1004        self.assertEqual(s, dict(n=1, t='x'))
     1005        r = get(table, s, ('n', 't'))
     1006        self.assertIs(r, s)
     1007        self.assertEqual(r, dict(n=1, t='x'))
     1008        query('alter table "%s" alter n set not null' % table)
     1009        query('alter table "%s" add primary key (n)' % table)
     1010        r = get(table, 2)
     1011        self.assertIsInstance(r, dict)
     1012        self.assertEqual(r, dict(n=2, t='y'))
     1013        self.assertEqual(get(table, 1)['t'], 'x')
     1014        self.assertEqual(get(table, 3)['t'], 'z')
     1015        self.assertEqual(get(table + '*', 2)['t'], 'y')
     1016        self.assertEqual(get(table + ' *', 2)['t'], 'y')
     1017        self.assertRaises(KeyError, get, table, (2, 2))
     1018        s = dict(n=3)
     1019        r = get(table, s)
     1020        self.assertIs(r, s)
     1021        self.assertEqual(r, dict(n=3, t='z'))
     1022        s.update(n=1)
     1023        self.assertEqual(get(table, s)['t'], 'x')
     1024        s.update(n=2)
     1025        self.assertEqual(get(table, r)['t'], 'y')
     1026        s.pop('n')
     1027        self.assertRaises(KeyError, get, table, s)
     1028
     1029    def testGetWithOid(self):
     1030        get = self.db.get
     1031        query = self.db.query
     1032        table = 'get_with_oid_test_table'
    9581033        query('drop table if exists "%s"' % table)
    9591034        self.addCleanup(query, 'drop table "%s"' % table)
     
    9641039                % (table, n + 1, t))
    9651040        self.assertRaises(pg.ProgrammingError, get, table, 2)
    966         self.assertRaises(pg.ProgrammingError, get, table, {}, 'oid')
     1041        self.assertRaises(KeyError, get, table, {}, 'oid')
    9671042        r = get(table, 2, 'n')
    968         oid_table = 'oid(%s)' % table
    969         self.assertIn(oid_table, r)
    970         oid = r[oid_table]
     1043        qoid = 'oid(%s)' % table
     1044        self.assertIn(qoid, r)
     1045        oid = r[qoid]
    9711046        self.assertIsInstance(oid, int)
    972         result = {'t': 'y', 'n': 2, oid_table: oid}
     1047        result = {'t': 'y', 'n': 2, qoid: oid}
    9731048        self.assertEqual(r, result)
     1049        r = get(table, oid, 'oid')
     1050        self.assertEqual(r, result)
     1051        r = get(table, dict(oid=oid))
     1052        self.assertEqual(r, result)
     1053        r = get(table, dict(oid=oid), 'oid')
     1054        self.assertEqual(r, result)
     1055        r = get(table, {qoid: oid})
     1056        self.assertEqual(r, result)
     1057        r = get(table, {qoid: oid}, 'oid')
     1058        self.assertEqual(r, result)
     1059        self.assertEqual(get(table + '*', 2, 'n'), r)
    9741060        self.assertEqual(get(table + ' *', 2, 'n'), r)
    9751061        self.assertEqual(get(table, oid, 'oid')['t'], 'y')
     
    9811067        self.assertEqual(get(table, r, 'n')['t'], 'z')
    9821068        self.assertEqual(get(table, 1, 'n')['t'], 'x')
     1069        self.assertEqual(get(table, r, 'oid')['t'], 'z')
    9831070        query('alter table "%s" alter n set not null' % table)
    9841071        query('alter table "%s" add primary key (n)' % table)
     
    9921079        r['n'] = 2
    9931080        self.assertEqual(get(table, r)['t'], 'y')
     1081        r = get(table, oid, 'oid')
     1082        self.assertEqual(r, result)
     1083        r = get(table, dict(oid=oid))
     1084        self.assertEqual(r, result)
     1085        r = get(table, dict(oid=oid), 'oid')
     1086        self.assertEqual(r, result)
     1087        r = get(table, {qoid: oid})
     1088        self.assertEqual(r, result)
     1089        r = get(table, {qoid: oid}, 'oid')
     1090        self.assertEqual(r, result)
     1091        r = get(table, dict(oid=oid, n=1))
     1092        self.assertEqual(r['n'], 1)
     1093        self.assertNotEqual(r[qoid], oid)
     1094        r = get(table, dict(oid=oid, t='z'), 't')
     1095        self.assertEqual(r['n'], 3)
     1096        self.assertNotEqual(r[qoid], oid)
    9941097
    9951098    def testGetWithCompositeKey(self):
     
    10051108                "%d, '%s')" % (table, n + 1, t))
    10061109        self.assertEqual(get(table, 2)['t'], 'b')
     1110        self.assertEqual(get(table, 1, 'n')['t'], 'a')
     1111        self.assertEqual(get(table, 2, ('n',))['t'], 'b')
     1112        self.assertEqual(get(table, 3, ['n'])['t'], 'c')
     1113        self.assertEqual(get(table, (2,), ('n',))['t'], 'b')
     1114        self.assertEqual(get(table, 'b', 't')['n'], 2)
     1115        self.assertEqual(get(table, ('a',), ('t',))['n'], 1)
     1116        self.assertEqual(get(table, ['c'], ['t'])['n'], 3)
    10071117        table = 'get_test_table_2'
    10081118        query('drop table if exists "%s"' % table)
     
    10151125                query('insert into "%s" values('
    10161126                    "%d, %d, '%s')" % (table, n + 1, m + 1, t))
    1017         self.assertRaises(pg.ProgrammingError, get, table, 2)
     1127        self.assertRaises(KeyError, get, table, 2)
     1128        self.assertEqual(get(table, (1, 1))['t'], 'a')
     1129        self.assertEqual(get(table, (1, 2))['t'], 'b')
     1130        self.assertEqual(get(table, (2, 1))['t'], 'c')
     1131        self.assertEqual(get(table, (1, 2), ('n', 'm'))['t'], 'b')
     1132        self.assertEqual(get(table, (1, 2), ('m', 'n'))['t'], 'c')
     1133        self.assertEqual(get(table, (3, 1), ('n', 'm'))['t'], 'e')
     1134        self.assertEqual(get(table, (1, 3), ('m', 'n'))['t'], 'e')
    10181135        self.assertEqual(get(table, dict(n=2, m=2))['t'], 'd')
    1019         r = get(table, dict(n=1, m=2), ('n', 'm'))
    1020         self.assertEqual(r['t'], 'b')
    1021         r = get(table, dict(n=3, m=2), frozenset(['n', 'm']))
    1022         self.assertEqual(r['t'], 'f')
     1136        self.assertEqual(get(table, dict(n=1, m=2), ('n', 'm'))['t'], 'b')
     1137        self.assertEqual(get(table, dict(n=2, m=1), ['n', 'm'])['t'], 'c')
     1138        self.assertEqual(get(table, dict(n=3, m=2), ('m', 'n'))['t'], 'f')
    10231139
    10241140    def testGetWithQuotedNames(self):
     
    11991315        self.assertIsInstance(r, dict)
    12001316        self.assertEqual(r['n'], 1)
     1317        self.assertNotIn('oid', r)
    12011318        qoid = 'oid(test_table)'
    12021319        self.assertIn(qoid, r)
    1203         r = insert('test_table', n=2, oid='invalid')
     1320        oid = r[qoid]
     1321        self.assertEqual(sorted(r.keys()), ['n', qoid])
     1322        r = insert('test_table', n=2, oid=oid)
    12041323        self.assertIsInstance(r, dict)
    12051324        self.assertEqual(r['n'], 2)
    1206         r['n'] = 3
     1325        self.assertIn(qoid, r)
     1326        self.assertNotEqual(r[qoid], oid)
     1327        self.assertNotIn('oid', r)
     1328        r = insert('test_table', None, n=3)
     1329        self.assertIsInstance(r, dict)
     1330        self.assertEqual(r['n'], 3)
     1331        s = r
     1332        r = insert('test_table', r)
     1333        self.assertIs(r, s)
     1334        self.assertEqual(r['n'], 3)
     1335        r = insert('test_table *', r)
     1336        self.assertIs(r, s)
     1337        self.assertEqual(r['n'], 3)
     1338        r = insert('test_table', r, n=4)
     1339        self.assertIs(r, s)
     1340        self.assertEqual(r['n'], 4)
     1341        self.assertNotIn('oid', r)
     1342        self.assertIn(qoid, r)
     1343        oid = r[qoid]
     1344        r = insert('test_table', r, n=5, oid=oid)
     1345        self.assertIs(r, s)
     1346        self.assertEqual(r['n'], 5)
     1347        self.assertIn(qoid, r)
     1348        self.assertNotEqual(r[qoid], oid)
     1349        self.assertNotIn('oid', r)
     1350        r['oid'] = oid = r[qoid]
     1351        r = insert('test_table', r, n=6)
     1352        self.assertIs(r, s)
     1353        self.assertEqual(r['n'], 6)
     1354        self.assertIn(qoid, r)
     1355        self.assertNotEqual(r[qoid], oid)
     1356        self.assertNotIn('oid', r)
     1357        q = 'select n from test_table order by 1 limit 9'
     1358        r = ' '.join(str(row[0]) for row in query(q).getresult())
     1359        self.assertEqual(r, '1 2 3 3 3 4 5 6')
     1360        query("truncate test_table")
     1361        query("alter table test_table add unique (n)")
     1362        r = insert('test_table', dict(n=7))
     1363        self.assertIsInstance(r, dict)
     1364        self.assertEqual(r['n'], 7)
     1365        self.assertRaises(pg.ProgrammingError, insert, 'test_table', r)
     1366        r['n'] = 6
     1367        self.assertRaises(pg.ProgrammingError, insert, 'test_table', r, n=7)
     1368        self.assertIsInstance(r, dict)
     1369        self.assertEqual(r['n'], 7)
     1370        r['n'] = 6
    12071371        r = insert('test_table', r)
    12081372        self.assertIsInstance(r, dict)
    1209         self.assertEqual(r['n'], 3)
    1210         r = insert('test_table', r, n=4)
    1211         self.assertIsInstance(r, dict)
    1212         self.assertEqual(r['n'], 4)
    1213         q = 'select n from test_table order by 1 limit 5'
     1373        self.assertEqual(r['n'], 6)
    12141374        r = query(q).getresult()
    1215         self.assertEqual(r, [(1,), (2,), (3,), (4,)])
     1375        r = ' '.join(str(row[0]) for row in query(q).getresult())
     1376        self.assertEqual(r, '6 7')
    12161377
    12171378    def testInsertWithQuotedNames(self):
     
    12671428        query("create table test_table (n int) with oids")
    12681429        query("insert into test_table values (1)")
    1269         r = get('test_table', 1, 'n')
    1270         self.assertIsInstance(r, dict)
    1271         self.assertEqual(r['n'], 1)
    1272         r['n'] = 2
    1273         r = update('test_table', r)
    1274         self.assertIsInstance(r, dict)
     1430        s = get('test_table', 1, 'n')
     1431        self.assertIsInstance(s, dict)
     1432        self.assertEqual(s['n'], 1)
     1433        s['n'] = 2
     1434        r = update('test_table', s)
     1435        self.assertIs(r, s)
    12751436        self.assertEqual(r['n'], 2)
    12761437        qoid = 'oid(test_table)'
    12771438        self.assertIn(qoid, r)
     1439        self.assertNotIn('oid', r)
     1440        self.assertEqual(sorted(r.keys()), ['n', qoid])
    12781441        r['n'] = 3
    1279         r = update('test_table', r, oid=r.pop(qoid))
    1280         self.assertIsInstance(r, dict)
     1442        oid = r.pop(qoid)
     1443        r = update('test_table', r, oid=oid)
     1444        self.assertIs(r, s)
    12811445        self.assertEqual(r['n'], 3)
    12821446        r.pop(qoid)
    12831447        self.assertRaises(pg.ProgrammingError, update, 'test_table', r)
    1284         r = get('test_table', 3, 'n')
    1285         self.assertIsInstance(r, dict)
    1286         self.assertEqual(r['n'], 3)
    1287         r.pop('n')
    1288         r = update('test_table', r)
    1289         r.pop(qoid)
     1448        s = get('test_table', 3, 'n')
     1449        self.assertIsInstance(s, dict)
     1450        self.assertEqual(s['n'], 3)
     1451        s.pop('n')
     1452        r = update('test_table', s)
     1453        oid = r.pop(qoid)
    12901454        self.assertEqual(r, {})
    1291         q = 'select n from test_table limit 2'
     1455        q = "select n from test_table limit 2"
    12921456        r = query(q).getresult()
    12931457        self.assertEqual(r, [(3,)])
     1458        query("insert into test_table values (1)")
     1459        self.assertRaises(pg.ProgrammingError,
     1460            update, 'test_table', dict(oid=oid, n=4))
     1461        r = update('test_table', dict(n=4), oid=oid)
     1462        self.assertEqual(r['n'], 4)
     1463        r = update('test_table *', dict(n=5), oid=oid)
     1464        self.assertEqual(r['n'], 5)
     1465        query("alter table test_table add column m int")
     1466        query("alter table test_table add primary key (n)")
     1467        self.assertIn('m', self.db.get_attnames('test_table', flush=True))
     1468        self.assertEqual('n', self.db.pkey('test_table', flush=True))
     1469        s = dict(n=1, m=4)
     1470        r = update('test_table', s)
     1471        self.assertIs(r, s)
     1472        self.assertEqual(r['n'], 1)
     1473        self.assertEqual(r['m'], 4)
     1474        s = dict(m=7)
     1475        r = update('test_table', s, n=5)
     1476        self.assertIs(r, s)
     1477        self.assertEqual(r['n'], 5)
     1478        self.assertEqual(r['m'], 7)
     1479        q = "select n, m from test_table order by 1 limit 3"
     1480        r = query(q).getresult()
     1481        self.assertEqual(r, [(1, 4), (5, 7)])
     1482        s = dict(m=9, oid=oid)
     1483        self.assertRaises(KeyError, update, 'test_table', s)
     1484        r = update('test_table', s, oid=oid)
     1485        self.assertIs(r, s)
     1486        self.assertEqual(r['n'], 5)
     1487        self.assertEqual(r['m'], 9)
     1488        s = dict(n=1, m=3, oid=oid)
     1489        r = update('test_table', s)
     1490        self.assertIs(r, s)
     1491        self.assertEqual(r['n'], 1)
     1492        self.assertEqual(r['m'], 3)
     1493        r = query(q).getresult()
     1494        self.assertEqual(r, [(1, 3), (5, 9)])
    12941495
    12951496    def testUpdateWithCompositeKey(self):
     
    13041505            query('insert into "%s" values('
    13051506                "%d, '%s')" % (table, n + 1, t))
    1306         self.assertRaises(pg.ProgrammingError, update,
    1307                           table, dict(t='b'))
     1507        self.assertRaises(KeyError, update, table, dict(t='b'))
    13081508        s = dict(n=2, t='d')
    13091509        r = update(table, s)
     
    13341534                query('insert into "%s" values('
    13351535                    "%d, %d, '%s')" % (table, n + 1, m + 1, t))
    1336         self.assertRaises(pg.ProgrammingError, update,
    1337                           table, dict(n=2, t='b'))
     1536        self.assertRaises(KeyError, update, table, dict(n=2, t='b'))
    13381537        self.assertEqual(update(table,
    1339                                 dict(n=2, m=2, t='x'))['t'], 'x')
     1538            dict(n=2, m=2, t='x'))['t'], 'x')
    13401539        q = 'select t from "%s" where n=2 order by m' % table
    13411540        r = [r[0] for r in query(q).getresult()]
     
    14401639        r = upsert(table, s, oid='invalid')
    14411640        self.assertIs(r, s)
     1641
     1642    def testUpsertWithOid(self):
     1643        upsert = self.db.upsert
     1644        get = self.db.get
     1645        query = self.db.query
     1646        query("drop table if exists test_table")
     1647        self.addCleanup(query, "drop table test_table")
     1648        query("create table test_table (n int) with oids")
     1649        query("insert into test_table values (1)")
     1650        self.assertRaises(pg.ProgrammingError,
     1651            upsert, 'test_table', dict(n=2))
     1652        r = get('test_table', 1, 'n')
     1653        self.assertIsInstance(r, dict)
     1654        self.assertEqual(r['n'], 1)
     1655        qoid = 'oid(test_table)'
     1656        self.assertIn(qoid, r)
     1657        self.assertNotIn('oid', r)
     1658        oid = r[qoid]
     1659        self.assertRaises(pg.ProgrammingError,
     1660            upsert, 'test_table', dict(n=2, oid=oid))
     1661        query("alter table test_table add column m int")
     1662        query("alter table test_table add primary key (n)")
     1663        self.assertIn('m', self.db.get_attnames('test_table', flush=True))
     1664        self.assertEqual('n', self.db.pkey('test_table', flush=True))
     1665        s = dict(n=2)
     1666        r = upsert('test_table', s)
     1667        self.assertIs(r, s)
     1668        self.assertEqual(r['n'], 2)
     1669        self.assertIsNone(r['m'])
     1670        q = query("select n, m from test_table order by n limit 3")
     1671        self.assertEqual(q.getresult(), [(1, None), (2, None)])
     1672        r['oid'] = oid
     1673        r = upsert('test_table', r)
     1674        self.assertIs(r, s)
     1675        self.assertEqual(r['n'], 2)
     1676        self.assertIsNone(r['m'])
     1677        self.assertIn(qoid, r)
     1678        self.assertNotIn('oid', r)
     1679        self.assertNotEqual(r[qoid], oid)
     1680        r['m'] = 7
     1681        r = upsert('test_table', r)
     1682        self.assertIs(r, s)
     1683        self.assertEqual(r['n'], 2)
     1684        self.assertEqual(r['m'], 7)
     1685        r.update(n=1, m=3)
     1686        r = upsert('test_table', r)
     1687        self.assertIs(r, s)
     1688        self.assertEqual(r['n'], 1)
     1689        self.assertEqual(r['m'], 3)
     1690        q = query("select n, m from test_table order by n limit 3")
     1691        self.assertEqual(q.getresult(), [(1, 3), (2, 7)])
     1692        r = upsert('test_table', r, oid='invalid')
     1693        self.assertIs(r, s)
     1694        self.assertEqual(r['n'], 1)
     1695        self.assertEqual(r['m'], 3)
     1696        r['m'] = 5
     1697        r = upsert('test_table', r, m=False)
     1698        self.assertIs(r, s)
     1699        self.assertEqual(r['n'], 1)
     1700        self.assertEqual(r['m'], 3)
     1701        r['m'] = 5
     1702        r = upsert('test_table', r, m=True)
     1703        self.assertIs(r, s)
     1704        self.assertEqual(r['n'], 1)
     1705        self.assertEqual(r['m'], 5)
     1706        r.update(n=2, m=1)
     1707        r = upsert('test_table', r, m='included.m')
     1708        self.assertIs(r, s)
     1709        self.assertEqual(r['n'], 2)
     1710        self.assertEqual(r['m'], 7)
     1711        r['m'] = 9
     1712        r = upsert('test_table', r, m='excluded.m')
     1713        self.assertIs(r, s)
     1714        self.assertEqual(r['n'], 2)
     1715        self.assertEqual(r['m'], 9)
     1716        r['m'] = 8
     1717        r = upsert('test_table *', r, m='included.m + 1')
     1718        self.assertIs(r, s)
     1719        self.assertEqual(r['n'], 2)
     1720        self.assertEqual(r['m'], 10)
     1721        q = query("select n, m from test_table order by n limit 3")
     1722        self.assertEqual(q.getresult(), [(1, 5), (2, 10)])
    14421723
    14431724    def testUpsertWithCompositeKey(self):
     
    16261907        self.addCleanup(query, "drop table test_table")
    16271908        query("create table test_table (n int) with oids")
    1628         query("insert into test_table values (1)")
    1629         query("insert into test_table values (2)")
    1630         query("insert into test_table values (3)")
     1909        for i in range(6):
     1910            query("insert into test_table values (%d)" % (i + 1))
    16311911        r = dict(n=3)
    16321912        self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
    1633         r = get('test_table', 1, 'n')
    1634         self.assertIsInstance(r, dict)
    1635         self.assertEqual(r['n'], 1)
     1913        s = get('test_table', 1, 'n')
    16361914        qoid = 'oid(test_table)'
    1637         self.assertIn(qoid, r)
    1638         oid = r[qoid]
    1639         self.assertIsInstance(oid, int)
    1640         s = delete('test_table', r)
    1641         self.assertEqual(s, 1)
    1642         s = delete('test_table', r)
    1643         self.assertEqual(s, 0)
    1644         r = get('test_table', 2, 'n')
    1645         self.assertIsInstance(r, dict)
    1646         self.assertEqual(r['n'], 2)
    1647         qoid = 'oid(test_table)'
    1648         self.assertIn(qoid, r)
    1649         oid = r[qoid]
    1650         self.assertIsInstance(oid, int)
    1651         r['oid'] = r.pop(qoid)
    1652         self.assertRaises(pg.ProgrammingError, delete, 'test_table', r)
    1653         s = delete('test_table', r, oid=oid)
    1654         self.assertEqual(s, 1)
    1655         s = delete('test_table', r)
    1656         self.assertEqual(s, 0)
    1657         s = delete('test_table', r, n=3)
    1658         self.assertEqual(s, 0)
    1659         q = 'select n from test_table order by 1 limit 3'
    1660         r = query(q).getresult()
    1661         self.assertEqual(r, [(3,)])
     1915        self.assertIn(qoid, s)
     1916        r = delete('test_table', s)
     1917        self.assertEqual(r, 1)
     1918        r = delete('test_table', s)
     1919        self.assertEqual(r, 0)
     1920        q = "select min(n),count(n) from test_table"
     1921        self.assertEqual(query(q).getresult()[0], (2, 5))
     1922        oid = get('test_table', 2, 'n')[qoid]
     1923        s = dict(oid=oid, n=2)
     1924        self.assertRaises(pg.ProgrammingError, delete, 'test_table', s)
     1925        r = delete('test_table', None, oid=oid)
     1926        self.assertEqual(r, 1)
     1927        r = delete('test_table', None, oid=oid)
     1928        self.assertEqual(r, 0)
     1929        self.assertEqual(query(q).getresult()[0], (3, 4))
     1930        s = dict(oid=oid, n=2)
     1931        oid = get('test_table', 3, 'n')[qoid]
     1932        self.assertRaises(pg.ProgrammingError, delete, 'test_table', s)
     1933        r = delete('test_table', s, oid=oid)
     1934        self.assertEqual(r, 1)
     1935        r = delete('test_table', s, oid=oid)
     1936        self.assertEqual(r, 0)
     1937        self.assertEqual(query(q).getresult()[0], (4, 3))
     1938        s = get('test_table', 4, 'n')
     1939        r = delete('test_table *', s)
     1940        self.assertEqual(r, 1)
     1941        r = delete('test_table *', s)
     1942        self.assertEqual(r, 0)
     1943        self.assertEqual(query(q).getresult()[0], (5, 2))
     1944        oid = get('test_table', 5, 'n')[qoid]
     1945        s = {qoid: oid, 'm': 4}
     1946        r = delete('test_table', s, m=6)
     1947        self.assertEqual(r, 1)
     1948        r = delete('test_table *', s)
     1949        self.assertEqual(r, 0)
     1950        self.assertEqual(query(q).getresult()[0], (6, 1))
     1951        query("alter table test_table add column m int")
     1952        query("alter table test_table add primary key (n)")
     1953        self.assertIn('m', self.db.get_attnames('test_table', flush=True))
     1954        self.assertEqual('n', self.db.pkey('test_table', flush=True))
     1955        for i in range(5):
     1956            query("insert into test_table values (%d, %d)" % (i + 1, i + 2))
     1957        s = dict(m=2)
     1958        self.assertRaises(KeyError, delete, 'test_table', s)
     1959        s = dict(m=2, oid=oid)
     1960        self.assertRaises(KeyError, delete, 'test_table', s)
     1961        r = delete('test_table', dict(m=2), oid=oid)
     1962        self.assertEqual(r, 0)
     1963        oid = get('test_table', 1, 'n')[qoid]
     1964        s = dict(oid=oid)
     1965        self.assertRaises(KeyError, delete, 'test_table', s)
     1966        r = delete('test_table', s, oid=oid)
     1967        self.assertEqual(r, 1)
     1968        r = delete('test_table', s, oid=oid)
     1969        self.assertEqual(r, 0)
     1970        self.assertEqual(query(q).getresult()[0], (2, 5))
     1971        s = get('test_table', 2, 'n')
     1972        del s['n']
     1973        r = delete('test_table', s)
     1974        self.assertEqual(r, 1)
     1975        r = delete('test_table', s)
     1976        self.assertEqual(r, 0)
     1977        self.assertEqual(query(q).getresult()[0], (3, 4))
     1978        r = delete('test_table', n=3)
     1979        self.assertEqual(r, 1)
     1980        r = delete('test_table', n=3)
     1981        self.assertEqual(r, 0)
     1982        self.assertEqual(query(q).getresult()[0], (4, 3))
     1983        r = delete('test_table', None, n=4)
     1984        self.assertEqual(r, 1)
     1985        r = delete('test_table', None, n=4)
     1986        self.assertEqual(r, 0)
     1987        self.assertEqual(query(q).getresult()[0], (5, 2))
     1988        s = dict(n=6)
     1989        r = delete('test_table', s, n=5)
     1990        self.assertEqual(r, 1)
     1991        r = delete('test_table', s, n=5)
     1992        self.assertEqual(r, 0)
     1993        self.assertEqual(query(q).getresult()[0], (6, 1))
    16621994
    16631995    def testDeleteWithCompositeKey(self):
     
    16712003            query("insert into %s values("
    16722004                "%d, '%s')" % (table, n + 1, t))
    1673         self.assertRaises(pg.ProgrammingError, self.db.delete,
    1674             table, dict(t='b'))
     2005        self.assertRaises(KeyError, self.db.delete, table, dict(t='b'))
    16752006        self.assertEqual(self.db.delete(table, dict(n=2)), 1)
    1676         r = query('select t from "%s" where n=2' % table
    1677                   ).getresult()
     2007        r = query('select t from "%s" where n=2' % table).getresult()
    16782008        self.assertEqual(r, [])
    16792009        self.assertEqual(self.db.delete(table, dict(n=2)), 0)
    1680         r = query('select t from "%s" where n=3' % table
    1681                   ).getresult()[0][0]
     2010        r = query('select t from "%s" where n=3' % table).getresult()[0][0]
    16822011        self.assertEqual(r, 'c')
    16832012        table = 'delete_test_table_2'
     
    16912020                query('insert into "%s" values('
    16922021                    "%d, %d, '%s')" % (table, n + 1, m + 1, t))
    1693         self.assertRaises(pg.ProgrammingError, self.db.delete,
    1694             table, dict(n=2, t='b'))
     2022        self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
    16952023        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
    16962024        r = [r[0] for r in query('select t from "%s" where n=2'
     
    17272055        r = query('select count(*) from "%s"' % table).getresult()
    17282056        self.assertEqual(r[0][0], 0)
     2057
     2058    def testDeleteReferenced(self):
     2059        delete = self.db.delete
     2060        query = self.db.query
     2061        query("drop table if exists test_child")
     2062        query("drop table if exists test_parent")
     2063        self.addCleanup(query, "drop table test_parent")
     2064        query("create table test_parent (n smallint primary key)")
     2065        self.addCleanup(query, "drop table test_child")
     2066        query("create table test_child ("
     2067            " n smallint primary key references test_parent (n))")
     2068        for n in range(3):
     2069            query("insert into test_parent (n) values (%d)" % n)
     2070            query("insert into test_child (n) values (%d)" % n)
     2071        q = ("select (select count(*) from test_parent),"
     2072            " (select count(*) from test_child)")
     2073        self.assertEqual(query(q).getresult()[0], (3, 3))
     2074        self.assertRaises(pg.ProgrammingError,
     2075            delete, 'test_parent', None, n=2)
     2076        self.assertRaises(pg.ProgrammingError,
     2077            delete, 'test_parent *', None, n=2)
     2078        r = delete('test_child', None, n=2)
     2079        self.assertEqual(r, 1)
     2080        self.assertEqual(query(q).getresult()[0], (3, 2))
     2081        r = delete('test_parent', None, n=2)
     2082        self.assertEqual(r, 1)
     2083        self.assertEqual(query(q).getresult()[0], (2, 2))
     2084        self.assertRaises(pg.ProgrammingError,
     2085            delete, 'test_parent', dict(n=0))
     2086        self.assertRaises(pg.ProgrammingError,
     2087            delete, 'test_parent *', dict(n=0))
     2088        r = delete('test_child', dict(n=0))
     2089        self.assertEqual(r, 1)
     2090        self.assertEqual(query(q).getresult()[0], (2, 1))
     2091        r = delete('test_child', dict(n=0))
     2092        self.assertEqual(r, 0)
     2093        r = delete('test_parent', dict(n=0))
     2094        self.assertEqual(r, 1)
     2095        self.assertEqual(query(q).getresult()[0], (1, 1))
     2096        r = delete('test_parent', None, n=0)
     2097        self.assertEqual(r, 0)
     2098        q = "select n from test_parent natural join test_child limit 2"
     2099        self.assertEqual(query(q).getresult(), [(1,)])
    17292100
    17302101    def testTruncate(self):
Note: See TracChangeset for help on using the changeset viewer.