Changeset 799


Ignore:
Timestamp:
Jan 31, 2016, 1:45:58 PM (3 years ago)
Author:
cito
Message:

Improve adaptation and add query_formatted() method

Also added more tests and documentation.

Location:
trunk
Files:
9 edited
1 copied

Legend:

Unmodified
Added
Removed
  • trunk/docs/contents/pg/adaptation.rst

    r797 r799  
    22=====================================
    33
    4 .. py:currentmodule:: pgdb
     4.. py:currentmodule:: pg
    55
    66Both PostgreSQL and Python have the concept of data types, but there
     
    1515Adaptation of parameters
    1616------------------------
    17 
    18 PyGreSQL knows how to adapt the common Python types to get a suitable
    19 representation of their values for PostgreSQL when you pass parameters
    20 to a query. For example::
    21 
    22     >>> con = pgdb.connect(...)
    23     >>> cur = con.cursor()
    24     >>> parameters = (144, 3.75, 'hello', None)
    25     >>> tuple(cur.execute('SELECT %s, %s, %s, %s', parameters).fetchone()
    26     (144, Decimal('3.75'), 'hello', None)
    27 
    28 This is the result we can expect, so obviously PyGreSQL has adapted the
    29 parameters and sent the following query to PostgreSQL:
    30 
    31 .. code-block:: sql
    32 
    33     SELECT 144, 3.75, 'hello', NULL
    34 
    35 Note the subtle, but important detail that even though the SQL string passed
    36 to :meth:`cur.execute` contains conversion specifications normally used in
    37 Python with the ``%`` operator for formatting strings, we didn't use the ``%``
    38 operator to format the parameters, but passed them as the second argument to
    39 :meth:`cur.execute`.  I.e. we **didn't** write the following::
    40 
    41 >>> tuple(cur.execute('SELECT %s, %s, %s, %s' % parameters).fetchone()
    42 
    43 If we had done this, PostgreSQL would have complained because the parameters
    44 were not adapted.  Particularly, there would be no quotes around the value
    45 ``'hello'``, so PostgreSQL would have interpreted this as a database column,
    46 which would have caused a :exc:`ProgrammingError`.  Also, the Python value
    47 ``None`` would have been included in the SQL command literally, instead of
    48 being converted to the SQL keyword ``NULL``, which would have been another
    49 reason for PostgreSQL to complain about our bad query:
    50 
    51 .. code-block:: sql
    52 
    53     SELECT 144, 3.75, hello, None
    54 
    55 Even worse, building queries with the use of the ``%`` operator makes us
    56 vulnerable to so called "SQL injection" exploits, where an attacker inserts
    57 malicious SQL statements into our queries that we never intended to be
    58 executed.  We could avoid this by carefully quoting and escaping the
    59 parameters, but this would be tedious and if we overlook something, our
    60 code will still be vulnerable.  So please don't do this.  This cannot be
    61 emphasized enough, because it is such a subtle difference and using the ``%``
    62 operator looks so natural:
     17When you use the higher level methods of the classic :mod:`pg` module like
     18:meth:`DB.insert()` or :meth:`DB.update()`, you don't need to care about
     19adaptation of parameters, since all of this is happening automatically behind
     20the scenes.  You only need to consider this issue when creating SQL commands
     21manually and sending them to the database using the :meth:`DB.query` method.
     22
     23Imagine you have created a user  login form that stores the login name as
     24*login* and the password as *passwd* and you now want to get the user
     25data for that user.  You may be tempted to execute a query like this::
     26
     27    >>> db = pg.DB(...)
     28    >>> sql = "SELECT * FROM user_table WHERE login = '%s' AND passwd = '%s'"
     29    >>> db.query(sql % (login, passwd)).getresult()[0]
     30
     31This seems to work at a first glance, but you will notice an error as soon as
     32you try to use a login name containing a single quote.  Even worse, this error
     33can be exploited through a so called "SQL injection", where an attacker inserts
     34malicious SQL statements into the query that you never intended to be executed.
     35For instance, with a login name something like ``' OR ''='`` the user could
     36easily log in and see the user data of another user in the database.
     37
     38One solution for this problem would be to clean your input from "dangerous"
     39characters like the single quote, but this is tedious and it is likely that
     40you overlook something or break the application e.g. for users with names
     41like "D'Arcy".  A better solution is to use the escaping functions provided
     42by PostgreSQL which are available as methods on the :class:`DB` object::
     43
     44    >>> login = "D'Arcy"
     45    >>> db.escape_string(login)
     46    "D''Arcy"
     47
     48As you see, :meth:`DB.escape_string` has doubled the single quote which is
     49the right thing to do in SQL.  However, there are better ways of passing
     50parameters to the query, without having to manually escape them.  If you
     51pass the parameters as positional arguments to :meth:`DB.query`, then
     52PyGreSQL will send them to the database separately, without the need for
     53quoting them inside the SQL command, and without the problems inherent with
     54that process.  In this case you must put placeholders of the form ``$1``,
     55``$2`` etc. in the SQL command in place of the parameters that should go there.
     56For instance::
     57
     58    >>> sql = "SELECT * FROM user_table WHERE login = $1 AND passwd = $2"
     59    >>> db.query(sql, login, passwd).getresult()[0]
     60
     61That's much better.  So please always keep the following warning in mind:
    6362
    6463.. warning::
     
    6766  the ``%`` operator.  Always pass the parameters separately.
    6867
    69 The good thing is that by letting PyGreSQL do the work for you, you can treat
    70 all your parameters equally and don't need to ponder where you need to put
    71 quotes or need to escape strings.  You can and should also always use the
    72 general ``%s`` specification instead of e.g. using ``%d`` for integers.
    73 Actually, to avoid mistakes and make it easier to insert parameters at more
    74 than one location, you can and should use named specifications, like this::
    75 
    76     >>> params = dict(greeting='Hello', name='HAL')
    77     >>> sql = """SELECT %(greeting)s || ', ' || %(name)s
    78     ...    || '. Do you read me, ' || %(name)s || '?'"""
    79     >>> cur.execute(sql, params).fetchone()[0]
    80     'Hello, HAL. Do you read me, HAL?'
    81 
    82 PyGreSQL does not only adapt the basic types like ``int``, ``float``,
    83 ``bool`` and ``str``, but also tries to make sense of Python lists and tuples.
     68If you like the ``%`` format specifications of Python better than the
     69placeholders used by PostgreSQL, there is still a way to use them, via the
     70:meth:`DB.query_formatted` method::
     71
     72    >>> sql = "SELECT * FROM user_table WHERE login = %s AND passwd = %s"
     73    >>> db.query_formatted(sql, (login, passwd)).getresult()[0]
     74
     75Note that we need to pass the parameters not as positional arguments here,
     76but as a single tuple.  Also note again that we did not use the ``%``
     77operator of Python to format the SQL string, we just used the ``%s`` format
     78specifications of Python and let PyGreSQL care about the formatting.
     79Even better, you can also pass the parameters as a dictionary if you use
     80the :meth:`DB.query_formatted` method::
     81
     82    >>> sql = """SELECT * FROM user_table
     83    ...     WHERE login = %(login)s AND passwd = %(passwd)s"""
     84    >>> parameters = dict(login=login, passwd=passwd)
     85    >>> db.query_formatted(sql, parameters).getresult()[0]
     86
     87Here is another example::
     88
     89    >>> sql = "SELECT 'Hello, ' || %s || '!'"
     90    >>> db.query_formatted(sql, (login,)).getresult()[0]
     91
     92You would think that the following even simpler example should work, too:
     93
     94    >>> sql = "SELECT %s"
     95    >>> db.query_formatted(sql, (login,)).getresult()[0]
     96    ProgrammingError: Could not determine data type of parameter $1
     97
     98The issue here is that :meth:`DB.query_formatted` by default still uses
     99PostgreSQL parameters, transforming the Python style ``%s`` placeholder
     100into a ``$1`` placeholder, and sending the login name separately from
     101the query.  In the query we looked at before, the concatenation with other
     102strings made it clear that it should be interpreted as a string. This simple
     103query however does not give PostgreSQL a clue what data type the ``$1``
     104placeholder stands for.
     105
     106This is different when you are embedding the login name directly into the
     107query instead of passing it as parameter to PostgreSQL.  You can achieve this
     108by setting the *inline* parameter of :meth:`DB.query_formatted`, like so::
     109
     110    >>> sql = "SELECT %s"
     111    >>> db.query_formatted(sql, (login,), inline=True).getresult()[0]
     112
     113Another way of making this query work while still sending the parameters
     114separately is to simply cast the parameter values::
     115
     116    >>> sql = "SELECT %s::text"
     117    >>> db.query_formatted(sql, (login,), inline=False).getresult()[0]
     118
     119In real world examples you will rarely have to cast your parameters like that,
     120since in an INSERT statement or a WHERE clause comparing the parameter to a
     121table column the data type will be clear from the context.
     122
     123When binding the parameters to a query, PyGreSQL does not only adapt the basic
     124types like ``int``, ``float``, ``bool`` and ``str``, but also tries to make
     125sense of Python lists and tuples.
    84126
    85127Lists are adapted as PostgreSQL arrays::
    86128
    87129   >>> params = dict(array=[[1, 2],[3, 4]])
    88    >>> cur.execute("SELECT %(array)s", params).fetchone()[0]
     130   >>> db.query_formatted("SELECT %(array)s::int[]", params).getresult()[0][0]
    89131   [[1, 2], [3, 4]]
    90132
    91 Note that the query gives the value back as Python lists again.  This
     133Note that again we only need to cast the array parameter or use inline
     134parameters because this simple query does not provide enough context.
     135Also note that the query gives the value back as Python lists again.  This
    92136is achieved by the typecasting mechanism explained in the next section.
    93 The query that was actually executed was this:
    94 
    95 .. code-block:: sql
    96 
    97     SELECT ARRAY[[1,2],[3,4]]
    98 
    99 Again, if we had inserted the list using the ``%`` operator without adaptation,
    100 the ``ARRAY`` keyword would have been missing in the query.
    101 
    102 Tuples are adapted as PostgreSQL composite types::
    103 
    104     >>> params = dict(record=('Bond', 'James'))
    105     >>> cur.execute("SELECT %(record)s", params).fetchone()[0]
    106     ('Bond', 'James')
    107 
    108 You can also use this feature with the ``IN`` syntax of SQL::
    109 
    110     >>> params = dict(what='needle', where=('needle', 'haystack'))
    111     >>> cur.execute("SELECT %(what)s IN %(where)s", params).fetchone()[0]
    112     True
    113 
    114 Sometimes a Python type can be ambiguous. For instance, you might want
    115 to insert a Python list not into an array column, but into a JSON column.
    116 Or you want to interpret a string as a date and insert it into a DATE column.
    117 In this case you can give PyGreSQL a hint by using :ref:`type_constructors`::
    118 
    119     >>> cur.execute("CREATE TABLE json_data (data json, created date)")
    120     >>> params = dict(
    121     ...     data=pgdb.Json([1, 2, 3]), created=pgdb.Date(2016, 1, 29))
    122     >>> sql = ("INSERT INTO json_data VALUES (%(data)s, %(created)s)")
    123     >>> cur.execute(sql, params)
    124     >>> cur.execute("SELECT * FROM json_data").fetchone()
    125     Row(data=[1, 2, 3], created='2016-01-29')
    126 
    127 Let's think of another example where we create a table with a composite
    128 type in PostgreSQL:
     137
     138Tuples are adapted as PostgreSQL composite types.  If you use inline paramters,
     139they can also be used with the ``IN`` syntax.
     140
     141Let's think of a more real world example again where we create a table with a
     142composite type in PostgreSQL:
    129143
    130144.. code-block:: sql
     
    152166inserted into the database and then read back as follows::
    153167
    154    >>> cur.execute("INSERT INTO on_hand VALUES (%(item)s, %(count)s)",
     168   >>> db.query_formatted("INSERT INTO on_hand VALUES (%(item)s, %(count)s)",
    155169   ...     dict(item=inventory_item('fuzzy dice', 42, 1.99), count=1000))
    156    >>> cur.execute("SELECT * FROM on_hand").fetchone()
     170   >>> db.query("SELECT * FROM on_hand").getresult()[0][0]
    157171   Row(item=inventory_item(name='fuzzy dice', supplier_id=42,
    158172           price=Decimal('1.99')), count=1000)
     173
     174The :meth:`DB.insert` method provides a simpler way to achieve the same::
     175
     176    >>> row = dict(item=inventory_item('fuzzy dice', 42, 1.99), count=1000)
     177    >>> db.insert('on_hand', row)
     178    {'count': 1000,  'item': inventory_item(name='fuzzy dice',
     179            supplier_id=42, price=Decimal('1.99'))}
    159180
    160181However, we may not want to use named tuples, but custom Python classes
     
    173194
    174195But when we try to insert an instance of this class in the same way, we
    175 will get an error::
    176 
    177    >>> cur.execute("INSERT INTO on_hand VALUES (%(item)s, %(count)s)",
    178    ...     dict(item=InventoryItem('fuzzy dice', 42, 1.99), count=1000))
    179    InterfaceError: Do not know how to adapt type <class 'InventoryItem'>
    180 
    181 While PyGreSQL knows how to adapt tuples, it does not know what to make out
    182 of our custom class.  To simply convert the object to a string using the
    183 ``str`` function is not a solution, since this yields a human readable string
    184 that is not useful for PostgreSQL.  However, it is possible to make such
    185 custom classes adapt themselves to PostgreSQL by adding a "magic" method
    186 with the name ``__pg_repr__``, like this::
    187 
    188   >>> class InventoryItem:
     196will get an error.  This is because PyGreSQL tries to pass the string
     197representation of the object as a parameter to PostgreSQL, but this is just a
     198human readable string and not useful for PostgreSQL to build a composite type.
     199However, it is possible to make such custom classes adapt themselves to
     200PostgreSQL by adding a "magic" method with the name ``__pg_str__``, like so::
     201
     202    >>> class InventoryItem:
    189203    ...
    190204    ...     ...
     
    194208    ...             self.name, self.supplier_id, self.price)
    195209    ...
    196     ...     def __pg_repr__(self):
     210    ...     def __pg_str__(self, typ):
    197211    ...         return (self.name, self.supplier_id, self.price)
    198212
    199213Now you can insert class instances the same way as you insert named tuples.
    200 
    201 Note that PyGreSQL adapts the result of ``__pg_repr__`` again if it is a
    202 tuple or a list.  Otherwise, it must be a properly escaped string.
     214You can even make these objects adapt to different types in different ways::
     215
     216    >>> class InventoryItem:
     217    ...
     218    ...     ...
     219    ...
     220    ...     def __pg_str__(self, typ):
     221    ...         if typ == 'text':
     222    ...             return str(self)
     223    ...        return (self.name, self.supplier_id, self.price)
     224    ...
     225    >>> db.query("ALTER TABLE on_hand ADD COLUMN remark varchar")
     226    >>> item=InventoryItem('fuzzy dice', 42, 1.99)
     227    >>> row = dict(item=item, remark=item, count=1000)
     228    >>> db.insert('on_hand', row)
     229    {'count': 1000, 'item': inventory_item(name='fuzzy dice',
     230        supplier_id=42, price=Decimal('1.99')),
     231        'remark': 'fuzzy dice (from 42, at $1.99)'}
     232
     233There is also another "magic" method ``__pg_repr__`` which does not take the
     234*typ* parameter.  That method is used instead of ``__pg_str__`` when passing
     235parameters inline.  You must be more careful when using ``__pg_repr__``,
     236because it must return a properly escaped string that can be put literally
     237inside the SQL.  The only exception is when you return a tuple or list,
     238because these will be adapted and properly escaped by PyGreSQL again.
    203239
    204240Typecasting to Python
     
    206242
    207243As you noticed, PyGreSQL automatically converted the PostgreSQL data to
    208 suitable Python objects when returning values via one of the "fetch" methods
    209 of a cursor.  This is done by the use of built-in typecast functions.
    210 
    211 If you want to use different typecast functions or add your own  if no
     244suitable Python objects when returning values via the :meth:`DB.get()`,
     245:meth:`Query.getresult()` and similar methods.  This is done by the use
     246of built-in typecast functions.
     247
     248If you want to use different typecast functions or add your own if no
    212249built-in typecast function is available, then this is possible using
    213 the :func:`set_typecast` function.  With the :func:`get_typecast` method
    214 you can check which function is currently set, and :func:`reset_typecast`
    215 allows you to reset the typecast function to its default.  If no typecast
    216 function is set, then PyGreSQL will return the raw strings from the database.
     250the :func:`set_typecast` function.  With the :func:`get_typecast` function
     251you can check which function is currently set.  If no typecast function
     252is set, then PyGreSQL will return the raw strings from the database.
    217253
    218254For instance, you will find that PyGreSQL uses the normal ``int`` function
    219255to cast PostgreSQL ``int4`` type values to Python::
    220256
    221     >>> pgdb.get_typecast('int4')
     257    >>> pg.get_typecast('int4')
    222258    int
    223259
    224 You can change this to return float values instead::
    225 
    226     >>> pgdb.set_typecast('int4', float)
    227     >>> con = pgdb.connect(...)
    228     >>> cur = con.cursor()
    229     >>> cur.execute('select 42::int4').fetchone()[0]
    230     42.0
    231 
    232 Note that the connections cache typecast functions, so you may need to
    233 reopen the database connection, or reset the cache of the connection to
    234 make this effective, using the following command::
    235 
    236    >>> con.type_cache.reset_typecast()
    237 
    238 The :class:`TypeCache` of the connection can also be used to change typecast
    239 functions locally for one database connection only.
    240 
    241 As a more useful example, we can create a typecast function that casts
    242 items of the composite type used as example in the previous section
    243 to instances of the corresponding Python class::
    244 
    245     >>> con.type_cache.reset_typecast()
    246     >>> cast_tuple = con.type_cache.get_typecast('inventory_item')
     260In the classic PyGreSQL module, the typecasting for these basic types is
     261always done internally by the C extension module for performance reasons.
     262We can set a different typecast function for ``int4``, but it will not
     263become effective, the C module continues to use its internal typecasting.
     264
     265However, we can add new typecast functions for the database types that are
     266not supported by the C modul. Fore example, we can create a typecast function
     267that casts items of the composite PostgreSQL type used as example in the
     268previous section to instances of the corresponding Python class.
     269
     270To do this, at first we get the default typecast function that PyGreSQL has
     271created for the current :class:`DB` connection.  This default function casts
     272composite types to named tuples, as we have seen in the section before.
     273We can grab it from the :attr:`DB.dbtypes` object as follows::
     274
     275    >>> cast_tuple = db.dbtypes.get_typecast('inventory_item')
     276
     277Now we can create a new typecast function that converts the tuple to
     278an instance of our custom class::
     279
    247280    >>> cast_item = lambda value: InventoryItem(*cast_tuple(value))
    248     >>> con.type_cache.set_typecast('inventory_item', cast_item)
    249     >>> str(cur.execute("SELECT * FROM on_hand").fetchone()[0])
     281
     282Finally, we set this typecast function, either globally with
     283:func:`set_typecast`, or locally for the current connection like this::
     284
     285    >>> db.dbtypes.set_typecast('inventory_item', cast_item)
     286
     287Now we can get instances of our custom class directly from the database::
     288
     289    >>> item = db.query("SELECT * FROM on_hand").getresult()[0][0]
     290    >>> str(item)
    250291    'fuzzy dice (from 42, at $1.99)'
    251292
    252 As you saw in the last section you, PyGreSQL also has a typecast function
    253 for JSON, which is the default JSON decoder from the standard library.
    254 Let's assume we want to use a slight variation of that decoder in which
    255 every integer in JSON is converted to a float in Python. This can be
    256 accomplished as follows::
    257 
    258     >>> from json import loads
    259     >>> cast_json = lambda v: loads(v, parse_int=float)
    260     >>> pgdb.set_typecast('json', cast_json)
    261     >>> cur.execute("SELECT data FROM json_data").fetchone()[0]
    262     [1.0, 2.0, 3.0]
    263 
    264 Note again that you may need to ``type_cache.reset_typecast()`` to make
    265 this effective.  Also note that the two types ``json`` and ``jsonb`` have
    266 their own typecast functions, so if you use ``jsonb`` instead of ``json``,
    267 you need to use this type name when setting the typecast function::
    268 
    269     >>> pgdb.set_typecast('jsonb', cast_json)
     293Note that some of the typecast functions used by the C module are configurable
     294with separate module level functions, such as :meth:`set_decimal`,
     295:meth:`set_bool` or :meth:`set_jsondecode`.  You need to use these instead of
     296:meth:`set_typecast` if you want to change the behavior of the C module.
     297
     298Also note that after changing global typecast functions with
     299:meth:`set_typecast`, you may need to run ``db.dbtypes.reset_typecast()``
     300to make these changes effective on connections that were already open.
  • trunk/docs/contents/pg/db_wrapper.rst

    r798 r799  
    452452    rows = db.query("update employees set phone=$2 where name=$1",
    453453         name, phone).getresult()[0][0]
     454
     455query_formatted -- execute a formatted SQL command string
     456---------------------------------------------------------
     457
     458.. method:: DB.query_formatted(command, parameters, [types], [inline])
     459
     460    Execute a formatted SQL command string
     461
     462    :param str command: SQL command
     463    :param parameters: the values of the parameters for the SQL command
     464    :type parameters: tuple, list or dict
     465    :param types: optionally, the types of the parameters
     466    :type types: tuple, list or dict
     467    :param bool inline: whether the parameters should be passed in the SQL
     468    :rtype: :class:`Query`, None
     469    :raises TypeError: bad argument type, or too many arguments
     470    :raises TypeError: invalid connection
     471    :raises ValueError: empty SQL query or lost connection
     472    :raises pg.ProgrammingError: error in query
     473    :raises pg.InternalError: error during query processing
     474
     475Similar to :meth:`DB.query`, but using Python format placeholders of the form
     476``%s`` or ``%(names)s`` instead of PostgreSQL placeholders of the form ``$1``.
     477The parameters must be passed as a tuple, list or dict.  You can also pass a
     478corresponding tuple, list or dict of database types in order to format the
     479parameters properly in case there is ambiguity.
     480
     481If you set *inline* to True, the parameters will be sent to the database
     482embedded in the SQL command, otherwise they will be sent separately.
     483
     484Example::
     485
     486    name = input("Name? ")
     487    phone = input("Phone? ")
     488    rows = db.query_formatted(
     489        "update employees set phone=%s where name=%s",
     490        (phone, name)).getresult()[0][0]
     491    # or
     492    rows = db.query_formatted(
     493        "update employees set phone=%(phone)s where name=%(name)s",
     494        dict(name=name, phone=phone)).getresult()[0][0]
    454495
    455496clear -- clear row values in memory
     
    780821    The name of the database that the connection is using
    781822
    782 
    783823.. attribute:: DB.dbtypes
    784824
     
    790830
    791831.. versionadded:: 5.0
     832
     833.. attribute:: DB.adapter
     834
     835    A class with some helper functions for adapting parameters
     836
     837This can be used for building queries with parameters.  You normally will
     838not need this, as you can use the :class:`DB.query_formatted` method.
     839
     840.. versionadded:: 5.0
  • trunk/docs/contents/pg/index.rst

    r798 r799  
    1717    notification
    1818    db_types
     19    adaptation
  • trunk/docs/contents/pg/module.rst

    r798 r799  
    500500supported by the C extension module.
    501501
     502Type helpers
     503------------
     504
     505The module provides the following type helper functions.  You can wrap
     506parameters with these functions when passing them to :meth:`DB.query_formatted`
     507in order to give PyGreSQL a hint about the type of the parameters.
     508
     509.. function:: Bytea(bytes)
     510
     511    A wrapper for holding a bytea value
     512
     513.. function:: Json(obj)
     514
     515    A wrapper for holding an object serializable to JSON
     516
     517.. function:: Literal(sql)
     518
     519    A wrapper for holding a literal SQL string
    502520
    503521Module constants
    504522----------------
     523
    505524Some constants are defined in the module dictionary.
    506525They are intended to be used as parameters for methods calls.
  • trunk/docs/contents/pgdb/adaptation.rst

    r797 r799  
    209209of a cursor.  This is done by the use of built-in typecast functions.
    210210
    211 If you want to use different typecast functions or add your own  if no
     211If you want to use different typecast functions or add your own if no
    212212built-in typecast function is available, then this is possible using
    213 the :func:`set_typecast` function.  With the :func:`get_typecast` method
     213the :func:`set_typecast` function.  With the :func:`get_typecast` function
    214214you can check which function is currently set, and :func:`reset_typecast`
    215215allows you to reset the typecast function to its default.  If no typecast
     
    230230    42.0
    231231
    232 Note that the connections cache typecast functions, so you may need to
     232Note that the connections cache the typecast functions, so you may need to
    233233reopen the database connection, or reset the cache of the connection to
    234234make this effective, using the following command::
     
    262262    [1.0, 2.0, 3.0]
    263263
    264 Note again that you may need to ``type_cache.reset_typecast()`` to make
    265 this effective.  Also note that the two types ``json`` and ``jsonb`` have
    266 their own typecast functions, so if you use ``jsonb`` instead of ``json``,
    267 you need to use this type name when setting the typecast function::
     264Note again that you may need to run ``con.type_cache.reset_typecast()`` to
     265make this effective.  Also note that the two types ``json`` and ``jsonb`` have
     266their own typecast functions, so if you use ``jsonb`` instead of ``json``, you
     267need to use this type name when setting the typecast function::
    268268
    269269    >>> pgdb.set_typecast('jsonb', cast_json)
  • trunk/docs/contents/pgdb/types.rst

    r797 r799  
    5555specific data types:
    5656
     57.. function:: Bytea(bytes)
     58
     59    Construct an object capable of holding a bytea value
     60
    5761.. function:: Json(obj, [encode])
    5862
    59     Construct a wrapper for holding an object serializable to JSON.
     63    Construct a wrapper for holding an object serializable to JSON
    6064
    6165    You can pass an optional serialization function as a parameter.
    6266    By default, PyGreSQL uses :func:`json.dumps` to serialize it.
     67
     68.. function:: Literal(sql)
     69
     70    Construct a wrapper for holding a literal SQL string
    6371
    6472Example for using a type constructor::
  • trunk/pg.py

    r798 r799  
    3535import warnings
    3636
     37from datetime import date, time, datetime, timedelta
    3738from decimal import Decimal
     39from math import isnan, isinf
    3840from collections import namedtuple
    39 from functools import partial
    4041from operator import itemgetter
    4142from re import compile as regex
     
    145146
    146147
    147 class _SimpleType(dict):
     148class _SimpleTypes(dict):
    148149    """Dictionary mapping pg_type names to simple type names."""
    149150
     
    169170        return 'text'
    170171
    171 _simpletype = _SimpleType()
    172 
    173 
    174 class _Adapt:
    175     """Mixin providing methods for adapting records and record elements.
    176 
    177     This is used when passing values from one of the higher level DB
    178     methods as parameters for a query.
    179 
    180     This class must be mixed in to a connection class, because it needs
    181     connection specific methods such as escape_bytea().
     172_simpletypes = _SimpleTypes()
     173
     174
     175def _quote_if_unqualified(param, name):
     176    """Quote parameter representing a qualified name.
     177
     178    Puts a quote_ident() call around the give parameter unless
     179    the name contains a dot, in which case the name is ambiguous
     180    (could be a qualified name or just a name with a dot in it)
     181    and must be quoted manually by the caller.
    182182    """
     183    if isinstance(name, basestring) and '.' not in name:
     184        return 'quote_ident(%s)' % (param,)
     185    return param
     186
     187
     188class _ParameterList(list):
     189    """Helper class for building typed parameter lists."""
     190
     191    def add(self, value, typ=None):
     192        """Typecast value with known database type and build parameter list.
     193
     194        If this is a literal value, it will be returned as is.  Otherwise, a
     195        placeholder will be returned and the parameter list will be augmented.
     196        """
     197        value = self.adapt(value, typ)
     198        if isinstance(value, Literal):
     199            return value
     200        self.append(value)
     201        return '$%d' % len(self)
     202
     203
     204class Literal(str):
     205    """Wrapper class for marking literal SQL values."""
     206
     207
     208class Json:
     209    """Wrapper class for marking Json values."""
     210
     211    def __init__(self, obj):
     212        self.obj = obj
     213
     214
     215class Bytea(bytes):
     216    """Wrapper class for marking Bytea values."""
     217
     218
     219class Adapter:
     220    """Class providing methods for adapting parameters to the database."""
    183221
    184222    _bool_true_values = frozenset('t true 1 y yes on'.split())
     
    190228    _re_record_quote = regex(r'[(,"\\]')
    191229    _re_array_escape = _re_record_escape = regex(r'(["\\])')
     230
     231    def __init__(self, db):
     232        self.db = db
     233        self.encode_json = db.encode_json
     234        db = db.db
     235        self.escape_bytea = db.escape_bytea
     236        self.escape_string = db.escape_string
    192237
    193238    @classmethod
     
    206251            return None
    207252        if isinstance(v, basestring) and v.lower() in cls._date_literals:
    208             return _Literal(v)
     253            return Literal(v)
    209254        return v
    210255
     
    298343    def _adapt_record(self, v, typ):
    299344        """Adapt a record parameter with given type."""
    300         typ = typ.attnames.values()
     345        typ = self.get_attnames(typ).values()
    301346        if len(typ) != len(v):
    302347            raise TypeError('Record parameter %s has wrong size' % v)
    303         return '(%s)' % ','.join(getattr(self,
    304             '_adapt_record_%s' % t.simple)(v) for v, t in zip(v, typ))
    305 
    306     @classmethod
    307     def _adapt_record_text(cls, v):
    308         """Adapt a text type record component."""
    309         if v is None:
    310             return ''
    311         if not v:
    312             return '""'
    313         v = str(v)
    314         if cls._re_record_quote.search(v):
    315             v = '"%s"' % cls._re_record_escape.sub(r'\\\1', v)
    316         return v
    317 
    318     _adapt_record_date = _adapt_record_text
    319 
    320     @classmethod
    321     def _adapt_record_bool(cls, v):
    322         """Adapt a boolean record component."""
    323         if v is None:
    324             return ''
    325         if isinstance(v, basestring):
    326             if not v:
    327                 return ''
    328             v = v.lower() in cls._bool_true_values
    329         return 't' if v else 'f'
    330 
    331     @staticmethod
    332     def _adapt_record_num(v):
    333         """Adapt a numeric record component."""
    334         if not v and v != 0:
    335             return ''
    336         return str(v)
    337 
    338     _adapt_record_int = _adapt_record_float = _adapt_record_money = \
    339         _adapt_record_num
    340 
    341     def _adapt_record_bytea(self, v):
    342         if v is None:
    343             return ''
    344         v = self.escape_bytea(v)
    345         if bytes is not str and isinstance(v, bytes):
    346             v = v.decode('ascii')
    347         return v.replace('\\', '\\\\')
    348 
    349     def _adapt_record_json(self, v):
    350         """Adapt a bytea record component."""
    351         if not v:
    352             return ''
    353         if not isinstance(v, basestring):
    354             v = self.encode_json(v)
    355         if self._re_array_quote.search(v):
    356             v = '"%s"' % self._re_array_escape.sub(r'\\\1', v)
    357         return v
    358 
    359     def _adapt_param(self, value, typ, params):
    360         """Adapt and add a parameter to the list."""
    361         if isinstance(value, _Literal):
    362             return value
    363         if value is not None:
    364             simple = typ.simple
     348        adapt = self.adapt
     349        value = []
     350        for v, t in zip(v, typ):
     351            v = adapt(v, t)
     352            if v is None:
     353                v = ''
     354            elif not v:
     355                v = '""'
     356            else:
     357                if isinstance(v, bytes):
     358                    if str is not bytes:
     359                        v = v.decode('ascii')
     360                else:
     361                    v = str(v)
     362                if self._re_record_quote.search(v):
     363                    v = '"%s"' % self._re_record_escape.sub(r'\\\1', v)
     364            value.append(v)
     365        return '(%s)' % ','.join(value)
     366
     367    def adapt(self, value, typ=None):
     368        """Adapt a value with known database type."""
     369        if value is not None and not isinstance(value, Literal):
     370            if typ:
     371                simple = self.get_simple_name(typ)
     372            else:
     373                typ = simple = self.guess_simple_type(value) or 'text'
     374            try:
     375                value = value.__pg_str__(typ)
     376            except AttributeError:
     377                pass
    365378            if simple == 'text':
    366379                pass
     
    375388                adapt = getattr(self, '_adapt_%s' % simple)
    376389                value = adapt(value)
    377                 if isinstance(value, _Literal):
    378                     return value
    379         params.append(value)
    380         return '$%d' % len(params)
     390        return value
     391
     392    @staticmethod
     393    def simple_type(name):
     394        """Create a simple database type with given attribute names."""
     395        typ = DbType(name)
     396        typ.simple = name
     397        return typ
     398
     399    @staticmethod
     400    def get_simple_name(typ):
     401        """Get the simple name of a database type."""
     402        if isinstance(typ, DbType):
     403            return typ.simple
     404        return _simpletypes[typ]
     405
     406    @staticmethod
     407    def get_attnames(typ):
     408        """Get the attribute names of a composite database type."""
     409        if isinstance(typ, DbType):
     410            return typ.attnames
     411        return {}
     412
     413    @classmethod
     414    def guess_simple_type(cls, value):
     415        """Try to guess which database type the given value has."""
     416        if isinstance(value, Bytea):
     417            return 'bytea'
     418        if isinstance(value, basestring):
     419            return 'text'
     420        if isinstance(value, bool):
     421            return 'bool'
     422        if isinstance(value, (int, long)):
     423            return 'int'
     424        if isinstance(value, float):
     425            return 'float'
     426        if isinstance(value, Decimal):
     427            return 'num'
     428        if isinstance(value, (date, time, datetime, timedelta)):
     429            return 'date'
     430        if isinstance(value, list):
     431            return '%s[]' % cls.guess_simple_base_type(value)
     432        if isinstance(value, tuple):
     433            simple_type = cls.simple_type
     434            typ = simple_type('record')
     435            guess = cls.guess_simple_type
     436            typ._get_attnames = lambda _self: AttrDict(
     437                (str(n + 1), simple_type(guess(v)))
     438                for n, v in enumerate(value))
     439            return typ
     440
     441    @classmethod
     442    def guess_simple_base_type(cls, value):
     443        """Try to guess the base type of a given array."""
     444        for v in value:
     445            if isinstance(v, list):
     446                typ = cls.guess_simple_base_type(v)
     447            else:
     448                typ = cls.guess_simple_type(v)
     449            if typ:
     450                return typ
     451
     452    def adapt_inline(self, value, nested=False):
     453        """Adapt a value that is put into the SQL and needs to be quoted."""
     454        if value is None:
     455            return 'NULL'
     456        if isinstance(value, Literal):
     457            return value
     458        if isinstance(value, Bytea):
     459            value = self.escape_bytea(value)
     460            if bytes is not str:  # Python >= 3.0
     461                value = value.decode('ascii')
     462        elif isinstance(value, Json):
     463            if value.encode:
     464                return value.encode()
     465            value = self.encode_json(value)
     466        elif isinstance(value, (datetime, date, time, timedelta)):
     467            value = str(value)
     468        if isinstance(value, basestring):
     469            value = self.escape_string(value)
     470            return "'%s'" % value
     471        if isinstance(value, bool):
     472            return 'true' if value else 'false'
     473        if isinstance(value, float):
     474            if isinf(value):
     475                return "'-Infinity'" if value < 0 else "'Infinity'"
     476            if isnan(value):
     477                return "'NaN'"
     478            return value
     479        if isinstance(value, (int, long, Decimal)):
     480            return value
     481        if isinstance(value, list):
     482            q = self.adapt_inline
     483            s = '[%s]' if nested else 'ARRAY[%s]'
     484            return s % ','.join(str(q(v, nested=True)) for v in value)
     485        if isinstance(value, tuple):
     486            q = self.adapt_inline
     487            return '(%s)' % ','.join(str(q(v)) for v in value)
     488        try:
     489            value = value.__pg_repr__()
     490        except AttributeError:
     491            raise InterfaceError(
     492                'Do not know how to adapt type %s' % type(value))
     493        if isinstance(value, (tuple, list)):
     494            value = self.adapt_inline(value)
     495        return value
     496
     497    def parameter_list(self):
     498        """Return a parameter list for parameters with known database types.
     499
     500        The list has an add(value, typ) method that will build up the
     501        list and return either the literal value or a placeholder.
     502        """
     503        params = _ParameterList()
     504        params.adapt = self.adapt
     505        return params
     506
     507    def format_query(self, command, values, types=None, inline=False):
     508        """Format a database query using the given values and types."""
     509        if inline and types:
     510            raise ValueError('Typed parameters must be sent separately')
     511        params = self.parameter_list()
     512        if isinstance(values, (list, tuple)):
     513            if inline:
     514                adapt = self.adapt_inline
     515                literals = [adapt(value) for value in values]
     516            else:
     517                add = params.add
     518                literals = []
     519                append = literals.append
     520                if types:
     521                    if (not isinstance(types, (list, tuple)) or
     522                            len(types) != len(values)):
     523                        raise TypeError('The values and types do not match')
     524                    for value, typ in zip(values, types):
     525                        append(add(value, typ))
     526                else:
     527                    for value in values:
     528                        append(add(value))
     529            command = command % tuple(literals)
     530        elif isinstance(values, dict):
     531            if inline:
     532                adapt = self.adapt_inline
     533                literals = dict((key, adapt(value))
     534                    for key, value in values.items())
     535            else:
     536                add = params.add
     537                literals = {}
     538                if types:
     539                    if (not isinstance(types, dict) or
     540                            len(types) < len(values)):
     541                        raise TypeError('The values and types do not match')
     542                    for key in sorted(values):
     543                        literals[key] = add(values[key], types[key])
     544                else:
     545                    for key in sorted(values):
     546                        literals[key] = add(values[key])
     547            command = command % literals
     548        else:
     549            raise TypeError('The values must be passed as tuple, list or dict')
     550        return command, params
    381551
    382552
     
    485655            for t in typ:
    486656                self[t] = cast
    487                 self.pop('_%s % t', None)
     657                self.pop('_%s' % t, None)
    488658
    489659    def reset(self, typ=None):
     
    492662        When no type is specified, all typecasts will be reset.
    493663        """
    494         defaults = self.defaults
    495664        if typ is None:
    496665            self.clear()
    497             self.update(defaults)
    498666        else:
    499667            if isinstance(typ, basestring):
    500668                typ = [typ]
    501669            for t in typ:
    502                 self.set(t, defaults.get(t))
     670                self.pop(t, None)
    503671
    504672    @classmethod
     
    522690            for t in typ:
    523691                defaults[t] = cast
    524                 defaults.pop('_%s % t', None)
     692                defaults.pop('_%s' % t, None)
    525693
    526694    def get_attnames(self, typ):
     
    603771        if oid in self:
    604772            return self[oid]
    605         simple = 'record' if relid else _simpletype[pgtype]
     773        simple = 'record' if relid else _simpletypes[pgtype]
    606774        typ = DbType(regtype if self._regtypes else simple)
    607775        typ.oid = oid
     
    622790                " typtype, typcategory, typdelim, typrelid"
    623791                " FROM pg_type WHERE oid=%s::regtype" %
    624                 (DB._adapt_qualified_param(key, 1),), (key,)).getresult()
     792                (_quote_if_unqualified('$1', key),), (key,)).getresult()
    625793        except ProgrammingError:
    626794            res = None
     
    675843            return value
    676844        return cast(value)
    677 
    678 
    679 class _Literal(str):
    680     """Wrapper class for literal SQL."""
    681845
    682846
     
    8601024# The actual PostGreSQL database connection interface:
    8611025
    862 class DB(_Adapt):
     1026class DB:
    8631027    """Wrapper class for the _pg connection type."""
    8641028
     
    8961060        self._privileges = {}
    8971061        self._args = args, kw
     1062        self.adapter = Adapter(self)
    8981063        self.dbtypes = DbTypes(self)
    8991064        db.set_cast_hook(self.dbtypes.typecast)
     
    9671132        """Create a human readable parameter list."""
    9681133        return ', '.join('$%d=%r' % (n, v) for n, v in enumerate(params, 1))
    969 
    970     @staticmethod
    971     def _adapt_qualified_param(name, param):
    972         """Quote parameter representing a qualified name.
    973 
    974         Escapes the name for use as an SQL parameter, unless the
    975         name contains a dot, in which case the name is ambiguous
    976         (could be a qualified name or just a name with a dot in it)
    977         and must be quoted manually by the caller.
    978 
    979         """
    980         if isinstance(param, int):
    981             param = "$%d" % param
    982         if isinstance(name, basestring) and '.' not in name:
    983             param = 'quote_ident(%s)' % (param,)
    984         return param
    9851134
    9861135    # Public methods
     
    12231372        return self.db.query(command)
    12241373
     1374    def query_formatted(self, command, parameters, types=None, inline=False):
     1375        """Execute a formatted SQL command string.
     1376
     1377        Similar to query, but using Python format placeholders of the form
     1378        %s or %(names)s instead of PostgreSQL placeholders of the form $1.
     1379        The parameters must be passed as a tuple, list or dict.  You can
     1380        also pass a corresponding tuple, list or dict of database types in
     1381        order to format the parameters properly in case there is ambiguity.
     1382
     1383        If you set inline to True, the parameters will be sent to the database
     1384        embedded in the SQL command, otherwise they will be sent separately.
     1385        """
     1386        return self.query(*self.adapter.format_query(
     1387            command, parameters, types, inline))
     1388
    12251389    def pkey(self, table, composite=False, flush=False):
    12261390        """Get or set the primary key of a table.
     
    12481412                " WHERE i.indrelid=%s::regclass"
    12491413                " AND i.indisprimary ORDER BY a.attnum") % (
    1250                     self._adapt_qualified_param(table, 1),)
     1414                    _quote_if_unqualified('$1', table),)
    12511415            pkey = self.db.query(q, (table,)).getresult()
    12521416            if not pkey:
     
    13211485                " WHERE a.attrelid = %s::regclass AND %s"
    13221486                " AND NOT a.attisdropped ORDER BY a.attnum") % (
    1323                     self._adapt_qualified_param(table, 1), q)
     1487                    _quote_if_unqualified('$1', table), q)
    13241488            names = self.db.query(q, (table,)).getresult()
    13251489            types = self.dbtypes
     
    13481512        except KeyError:  # cache miss, ask the database
    13491513            q = "SELECT has_table_privilege(%s, $2)" % (
    1350                 self._adapt_qualified_param(table, 1),)
     1514                _quote_if_unqualified('$1', table),)
    13511515            q = self.db.query(q, (table, privilege))
    13521516            ret = q.getresult()[0][0] == self._make_bool(True)
     
    14051569                    'Differing number of items in keyname and row')
    14061570            row = dict(zip(keyname, row))
    1407         params = []
    1408         param = partial(self._adapt_param, params=params)
     1571        params = self.adapter.parameter_list()
     1572        adapt = params.add
    14091573        col = self.escape_identifier
    14101574        what = 'oid, *' if qoid else '*'
    14111575        where = ' AND '.join('%s = %s' % (
    1412             col(k), param(row[k], attnames[k])) for k in keyname)
     1576            col(k), adapt(row[k], attnames[k])) for k in keyname)
    14131577        if 'oid' in row:
    14141578            if qoid:
     
    14511615        attnames = self.get_attnames(table)
    14521616        qoid = _oid_key(table) if 'oid' in attnames else None
    1453         params = []
    1454         param = partial(self._adapt_param, params=params)
     1617        params = self.adapter.parameter_list()
     1618        adapt = params.add
    14551619        col = self.escape_identifier
    14561620        names, values = [], []
     
    14581622            if n in row:
    14591623                names.append(col(n))
    1460                 values.append(param(row[n], attnames[n]))
     1624                values.append(adapt(row[n], attnames[n]))
    14611625        if not names:
    14621626            raise _prg_error('No column found that can be inserted')
     
    15121676                else:
    15131677                    raise KeyError('Missing primary key in row')
    1514         params = []
    1515         param = partial(self._adapt_param, params=params)
     1678        params = self.adapter.parameter_list()
     1679        adapt = params.add
    15161680        col = self.escape_identifier
    15171681        where = ' AND '.join('%s = %s' % (
    1518             col(k), param(row[k], attnames[k])) for k in keyname)
     1682            col(k), adapt(row[k], attnames[k])) for k in keyname)
    15191683        if 'oid' in row:
    15201684            if qoid:
     
    15251689        for n in attnames:
    15261690            if n in row and n not in keyname:
    1527                 values.append('%s = %s' % (col(n), param(row[n], attnames[n])))
     1691                values.append('%s = %s' % (col(n), adapt(row[n], attnames[n])))
    15281692        if not values:
    15291693            return row
     
    15951759        attnames = self.get_attnames(table)
    15961760        qoid = _oid_key(table) if 'oid' in attnames else None
    1597         params = []
    1598         param = partial(self._adapt_param,params=params)
     1761        params = self.adapter.parameter_list()
     1762        adapt = params.add
    15991763        col = self.escape_identifier
    16001764        names, values, updates = [], [], []
     
    16021766            if n in row:
    16031767                names.append(col(n))
    1604                 values.append(param(row[n], attnames[n]))
     1768                values.append(adapt(row[n], attnames[n]))
    16051769        names, values = ', '.join(names), ', '.join(values)
    16061770        try:
     
    17091873                else:
    17101874                    raise KeyError('Missing primary key in row')
    1711         params = []
    1712         param = partial(self._adapt_param, params=params)
     1875        params = self.adapter.parameter_list()
     1876        adapt = params.add
    17131877        col = self.escape_identifier
    17141878        where = ' AND '.join('%s = %s' % (
    1715             col(k), param(row[k], attnames[k])) for k in keyname)
     1879            col(k), adapt(row[k], attnames[k])) for k in keyname)
    17161880        if 'oid' in row:
    17171881            if qoid:
  • trunk/pgdb.py

    r798 r799  
    210210            for t in typ:
    211211                self[t] = cast
    212                 self.pop('_%s % t', None)
     212                self.pop('_%s' % t, None)
    213213
    214214    def reset(self, typ=None):
     
    225225                typ = [typ]
    226226            for t in typ:
    227                 self.set(t, defaults.get(t))
     227                cast = defaults.get(t)
     228                if cast:
     229                    self[t] = cast
     230                    t = '_%s' % t
     231                    cast = defaults.get(t)
     232                    if cast:
     233                        self[t] = cast
     234                    else:
     235                        self.pop(t, None)
     236                else:
     237                    self.pop(t, None)
     238                    self.pop('_%s' % t, None)
    228239
    229240    def create_array_cast(self, cast):
     
    483494                return "'NaN'"
    484495            return value
    485         if isinstance(value, (int, long, Decimal)):
     496        if isinstance(value, (int, long, Decimal, Literal)):
    486497            return value
    487498        if isinstance(value, list):
     
    497508            # literal because it carries the information that this is a record
    498509            # and not a string.  We don't use the keyword ROW in order to make
    499             # this usable with the IN synntax as well.  It is only necessary
     510            # this usable with the IN syntax as well.  It is only necessary
    500511            # when the records has a single column which is not really useful.
    501512            q = self._quote
     
    503514        try:
    504515            value = value.__pg_repr__()
    505             if isinstance(value, (tuple, list)):
    506                 value = self._quote(value)
    507             return value
    508516        except AttributeError:
    509517            raise InterfaceError(
    510518                'Do not know how to adapt type %s' % type(value))
     519        if isinstance(value, (tuple, list)):
     520            value = self._quote(value)
     521        return value
    511522
    512523    def _quoteparams(self, string, parameters):
     
    12961307# Additional type helpers for PyGreSQL:
    12971308
     1309class Bytea(bytes):
     1310    """Construct an object capable of holding a bytea value."""
     1311
     1312
    12981313class Json:
    12991314    """Construct a wrapper for holding an object serializable to JSON."""
     
    13121327
    13131328
     1329class Literal:
     1330    """Construct a wrapper for holding a literal SQL string."""
     1331
     1332    def __init__(self, sql):
     1333        self.sql = sql
     1334
     1335    def __str__(self):
     1336        return self.sql
     1337
     1338    __pg_repr__ = __str__
     1339
     1340
    13141341# If run as script, print some information:
    13151342
  • trunk/tests/test_classic_dbwrapper.py

    r798 r799  
    2424
    2525from decimal import Decimal
     26from datetime import date
    2627from operator import itemgetter
    2728
     
    177178    def testAllDBAttributes(self):
    178179        attributes = [
    179             'abort',
     180            'abort', 'adapter',
    180181            'begin',
    181182            'cancel', 'clear', 'close', 'commit',
     
    198199            'parameter', 'pkey', 'port',
    199200            'protocol_version', 'putline',
    200             'query',
     201            'query', 'query_formatted',
    201202            'release', 'reopen', 'reset', 'rollback',
    202203            'savepoint', 'server_version',
     
    667668        self.assertEqual(g('default_with_oids'), 'on')
    668669        self.assertEqual(g('standard_conforming_strings'), 'on')
    669         self.assertRaises(ValueError, f, set([ 'default_with_oids',
    670                                                'standard_conforming_strings']), ['off', 'on'])
     670        self.assertRaises(ValueError, f, set(['default_with_oids',
     671            'standard_conforming_strings']), ['off', 'on'])
    671672        f(set(['default_with_oids', 'standard_conforming_strings']),
    672           ['off', 'off'])
     673            ['off', 'off'])
    673674        self.assertEqual(g('default_with_oids'), 'off')
    674675        self.assertEqual(g('standard_conforming_strings'), 'off')
     
    877878            self.assertEqual(error.sqlstate, '22012')
    878879
     880    def testQueryFormatted(self):
     881        f = self.db.query_formatted
     882        t = True if pg.get_bool() else 't'
     883        q = f("select %s::int, %s::real, %s::text, %s::bool",
     884              (3, 2.5, 'hello', True))
     885        r = q.getresult()[0]
     886        self.assertEqual(r, (3, 2.5, 'hello', t))
     887        q = f("select %s, %s, %s, %s", (3, 2.5, 'hello', True), inline=True)
     888        r = q.getresult()[0]
     889        self.assertEqual(r, (3, 2.5, 'hello', t))
     890
    879891    def testPkey(self):
    880892        query = self.db.query
     
    885897            self.createTable('%s1' % t, 'b smallint primary key')
    886898            self.createTable('%s2' % t,
    887                              'c smallint, d smallint primary key')
     899                'c smallint, d smallint primary key')
    888900            self.createTable('%s3' % t,
    889                              'e smallint, f smallint, g smallint, h smallint, i smallint,'
    890                              ' primary key (f, h)')
     901                'e smallint, f smallint, g smallint, h smallint, i smallint,'
     902                ' primary key (f, h)')
    891903            self.createTable('%s4' % t,
    892                              'e smallint, f smallint, g smallint, h smallint, i smallint,'
    893                              ' primary key (h, f)')
     904                'e smallint, f smallint, g smallint, h smallint, i smallint,'
     905                ' primary key (h, f)')
    894906            self.createTable('%s5' % t,
    895                              'more_than_one_letter varchar primary key')
     907                'more_than_one_letter varchar primary key')
    896908            self.createTable('%s6' % t,
    897                              '"with space" date primary key')
     909                '"with space" date primary key')
    898910            self.createTable('%s7' % t,
    899                              'a_very_long_column_name varchar, "with space" date, "42" int,'
    900                              ' primary key (a_very_long_column_name, "with space", "42")')
     911                'a_very_long_column_name varchar, "with space" date, "42" int,'
     912                ' primary key (a_very_long_column_name, "with space", "42")')
    901913            self.assertRaises(KeyError, pkey, '%s0' % t)
    902914            self.assertEqual(pkey('%s1' % t), 'b')
     
    10221034        table = 'test table for get_attnames()'
    10231035        self.createTable(table,
    1024                          '"Prime!" smallint, "much space" integer, "Questions?" text')
     1036            '"Prime!" smallint, "much space" integer, "Questions?" text')
    10251037        r = get_attnames(table)
    10261038        self.assertIsInstance(r, dict)
     
    10521064        else:
    10531065            self.assertEqual(r, {'a': 'int', 'b': 'int', 'c': 'int',
    1054                                  'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
    1055                                  'normal_name': 'int', 'Special Name': 'int',
    1056                                  'u': 'text', 't': 'text', 'v': 'text',
    1057                                  'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
     1066                 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
     1067                 'normal_name': 'int', 'Special Name': 'int',
     1068                 'u': 'text', 't': 'text', 'v': 'text',
     1069                 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
    10581070
    10591071    def testGetAttnamesWithRegtypes(self):
     
    14001412        except pg.DatabaseError as error:
    14011413            self.assertEqual(str(error),
    1402                              'No such record in test_students\nwhere "firstname" = $1\n'
    1403                              'with $1="D\' Arcy"')
     1414                'No such record in test_students\nwhere "firstname" = $1\n'
     1415                'with $1="D\' Arcy"')
    14041416        try:
    14051417            get('test_students', "Robert'); TRUNCATE TABLE test_students;--")
    14061418        except pg.DatabaseError as error:
    14071419            self.assertEqual(str(error),
    1408                              'No such record in test_students\nwhere "firstname" = $1\n'
    1409                              'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
     1420                'No such record in test_students\nwhere "firstname" = $1\n'
     1421                'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
    14101422        q = "select * from test_students order by 1 limit 4"
    14111423        r = query(q).getresult()
     
    14201432        table = 'insert_test_table'
    14211433        self.createTable(table,
    1422                          'i2 smallint, i4 integer, i8 bigint,'
    1423                          ' d numeric, f4 real, f8 double precision, m money,'
    1424                          ' v4 varchar(4), c4 char(4), t text,'
    1425                          ' b boolean, ts timestamp', oids=True)
     1434            'i2 smallint, i4 integer, i8 bigint,'
     1435            ' d numeric, f4 real, f8 double precision, m money,'
     1436            ' v4 varchar(4), c4 char(4), t text,'
     1437            ' b boolean, ts timestamp', oids=True)
    14261438        oid_table = 'oid(%s)' % table
    14271439        tests = [dict(i2=None, i4=None, i8=None),
    1428                  (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
    1429                  (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
    1430                  dict(i2=42, i4=123456, i8=9876543210),
    1431                  dict(i2=2 ** 15 - 1,
    1432                       i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
    1433                  dict(d=None), (dict(d=''), dict(d=None)),
    1434                  dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
    1435                  dict(f4=None, f8=None), dict(f4=0, f8=0),
    1436                  (dict(f4='', f8=''), dict(f4=None, f8=None)),
    1437                  (dict(d=1234.5, f4=1234.5, f8=1234.5),
    1438                   dict(d=Decimal('1234.5'))),
    1439                  dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
    1440                  dict(d=Decimal('123456789.9876543212345678987654321')),
    1441                  dict(m=None), (dict(m=''), dict(m=None)),
    1442                  dict(m=Decimal('-1234.56')),
    1443                  (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
    1444                  dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
    1445                  (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
    1446                  (dict(m=1234.5), dict(m=Decimal('1234.5'))),
    1447                  (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
    1448                  (dict(m=123456), dict(m=Decimal('123456'))),
    1449                  (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
    1450                  dict(b=None), (dict(b=''), dict(b=None)),
    1451                  dict(b='f'), dict(b='t'),
    1452                  (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
    1453                  (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
    1454                  (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
    1455                  (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
    1456                  (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
    1457                  (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
    1458                  dict(v4=None, c4=None, t=None),
    1459                  (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
    1460                  dict(v4='1234', c4='1234', t='1234' * 10),
    1461                  dict(v4='abcd', c4='abcd', t='abcdefg'),
    1462                  (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
    1463                  dict(ts=None), (dict(ts=''), dict(ts=None)),
    1464                  (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
    1465                  dict(ts='2012-12-21 00:00:00'),
    1466                  (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
    1467                  dict(ts='2012-12-21 12:21:12'),
    1468                  dict(ts='2013-01-05 12:13:14'),
    1469                  dict(ts='current_timestamp')]
     1440             (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
     1441             (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
     1442             dict(i2=42, i4=123456, i8=9876543210),
     1443             dict(i2=2 ** 15 - 1,
     1444                  i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
     1445             dict(d=None), (dict(d=''), dict(d=None)),
     1446             dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
     1447             dict(f4=None, f8=None), dict(f4=0, f8=0),
     1448             (dict(f4='', f8=''), dict(f4=None, f8=None)),
     1449             (dict(d=1234.5, f4=1234.5, f8=1234.5),
     1450              dict(d=Decimal('1234.5'))),
     1451             dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
     1452             dict(d=Decimal('123456789.9876543212345678987654321')),
     1453             dict(m=None), (dict(m=''), dict(m=None)),
     1454             dict(m=Decimal('-1234.56')),
     1455             (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
     1456             dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
     1457             (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
     1458             (dict(m=1234.5), dict(m=Decimal('1234.5'))),
     1459             (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
     1460             (dict(m=123456), dict(m=Decimal('123456'))),
     1461             (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
     1462             dict(b=None), (dict(b=''), dict(b=None)),
     1463             dict(b='f'), dict(b='t'),
     1464             (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
     1465             (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
     1466             (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
     1467             (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
     1468             (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
     1469             (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
     1470             dict(v4=None, c4=None, t=None),
     1471             (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
     1472             dict(v4='1234', c4='1234', t='1234' * 10),
     1473             dict(v4='abcd', c4='abcd', t='abcdefg'),
     1474             (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
     1475             dict(ts=None), (dict(ts=''), dict(ts=None)),
     1476             (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
     1477             dict(ts='2012-12-21 00:00:00'),
     1478             (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
     1479             dict(ts='2012-12-21 12:21:12'),
     1480             dict(ts='2013-01-05 12:13:14'),
     1481             dict(ts='current_timestamp')]
    14701482        for test in tests:
    14711483            if isinstance(test, dict):
     
    20572069        table = 'clear_test_table'
    20582070        self.createTable(table,
    2059                          'n integer, f float, b boolean, d date, t text', oids=True)
     2071            'n integer, f float, b boolean, d date, t text', oids=True)
    20602072        r = clear(table)
    20612073        result = dict(n=0, f=0, b=f, d='', t='')
     
    20732085        table = 'test table for clear()'
    20742086        self.createTable(table, '"Prime!" smallint primary key,'
    2075                                 ' "much space" integer, "Questions?" text')
     2087            ' "much space" integer, "Questions?" text')
    20762088        r = clear(table)
    20772089        self.assertIsInstance(r, dict)
     
    22162228        table = 'delete_test_table_2'
    22172229        self.createTable(table,
    2218                          'n integer, m integer, t text, primary key (n, m)',
    2219                          values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
    2220                                  for n in range(3) for m in range(2)])
     2230             'n integer, m integer, t text, primary key (n, m)',
     2231             values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     2232                     for n in range(3) for m in range(2)])
    22212233        self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
    22222234        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
    22232235        r = [r[0] for r in query('select t from "%s" where n=2'
    2224                                  ' order by m' % table).getresult()]
     2236            ' order by m' % table).getresult()]
    22252237        self.assertEqual(r, ['c'])
    22262238        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 0)
    22272239        r = [r[0] for r in query('select t from "%s" where n=3'
    2228                                  ' order by m' % table).getresult()]
     2240             ' order by m' % table).getresult()]
    22292241        self.assertEqual(r, ['e', 'f'])
    22302242        self.assertEqual(self.db.delete(table, dict(n=3, m=1)), 1)
    22312243        r = [r[0] for r in query('select t from "%s" where n=3'
    2232                                  ' order by m' % table).getresult()]
     2244             ' order by m' % table).getresult()]
    22332245        self.assertEqual(r, ['f'])
    22342246
     
    22382250        table = 'test table for delete()'
    22392251        self.createTable(table, '"Prime!" smallint primary key,'
    2240                                 ' "much space" integer, "Questions?" text',
    2241                          values=[(19, 5005, 'Yes!')])
     2252            ' "much space" integer, "Questions?" text',
     2253            values=[(19, 5005, 'Yes!')])
    22422254        r = {'Prime!': 17}
    22432255        r = delete(table, r)
     
    22552267        query = self.db.query
    22562268        self.createTable('test_parent',
    2257                          'n smallint primary key', values=range(3))
     2269            'n smallint primary key', values=range(3))
    22582270        self.createTable('test_child',
    2259                          'n smallint primary key references test_parent', values=range(3))
     2271            'n smallint primary key references test_parent', values=range(3))
    22602272        q = ("select (select count(*) from test_parent),"
    22612273             " (select count(*) from test_child)")
     
    24412453        self.assertEqual(r, (0, 0, 0, 0))
    24422454        self.assertRaises(ValueError, truncate,
    2443                           ['test_parent*', 'test_child'], only=[True, False])
     2455            ['test_parent*', 'test_child'], only=[True, False])
    24442456        truncate(['test_parent*', 'test_child'], only=[False, True])
    24452457
     
    24752487                 (3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
    24762488        self.createTable(table,
    2477                          'id smallint primary key, name varchar', values=names)
     2489            'id smallint primary key, name varchar', values=names)
    24782490        r = get_as_list(table)
    24792491        self.assertIsInstance(r, list)
     
    25042516        self.assertEqual(r, [('Maggie',), ('Marge',)])
    25052517        r = get_as_list(table, what='name',
    2506                         where=["name like 'Ma%'", "name like '%r%'"])
     2518            where=["name like 'Ma%'", "name like '%r%'"])
    25072519        self.assertIsInstance(r, list)
    25082520        self.assertEqual(r, [('Marge',)])
     
    25892601                  (3, '#b2ffff', 'Celeste'), (4, '#c19a6b', 'Desert')]
    25902602        self.createTable(table,
    2591                          'id smallint primary key, rgb char(7), name varchar',
    2592                          values=colors)
     2603            'id smallint primary key, rgb char(7), name varchar',
     2604            values=colors)
    25932605        # keyname must be string, list or tuple
    25942606        self.assertRaises(KeyError, get_as_dict, table, 3)
     
    26682680        self.assertIsInstance(r, OrderedDict)
    26692681        expected = OrderedDict((row[1], row[2])
    2670                                for row in sorted(colors, key=itemgetter(1)))
     2682            for row in sorted(colors, key=itemgetter(1)))
    26712683        self.assertEqual(r, expected)
    26722684        for key in r:
     
    26792691            self.assertEqual(r.keys(), expected.keys())
    26802692        r = get_as_dict(table, what='id, name',
    2681                         where="rgb like '#b%'", scalar=True)
     2693            where="rgb like '#b%'", scalar=True)
    26822694        self.assertIsInstance(r, OrderedDict)
    26832695        expected = OrderedDict((row[0], row[2]) for row in colors[1:3])
     
    26932705        expected = r
    26942706        r = get_as_dict(table, what=['name', 'id'],
    2695                         where=['id > 1', 'id < 4', "rgb like '#b%'",
    2696                                "name not like 'A%'", "name not like '%t'"], scalar=True)
     2707            where=['id > 1', 'id < 4', "rgb like '#b%'",
     2708                   "name not like 'A%'", "name not like '%t'"], scalar=True)
    26972709        self.assertEqual(r, expected)
    26982710        r = get_as_dict(table, what='name, id', limit=2, offset=1, scalar=True)
    26992711        self.assertEqual(r, expected)
    27002712        r = get_as_dict(table, keyname=('id',), what=('name', 'id'),
    2701                         where=('id > 1', 'id < 4'), order=('id',), scalar=True)
     2713            where=('id > 1', 'id < 4'), order=('id',), scalar=True)
    27022714        self.assertEqual(r, expected)
    27032715        r = get_as_dict(table, limit=1)
     
    30453057    def testArray(self):
    30463058        self.createTable('arraytest',
    3047                          'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
    3048                          ' d numeric[], f4 real[], f8 double precision[], m money[],'
    3049                          ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
     3059            'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
     3060            ' d numeric[], f4 real[], f8 double precision[], m money[],'
     3061            ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
    30503062        r = self.db.get_attnames('arraytest')
    30513063        if self.regtypes:
     
    30693081        t, f = (True, False) if pg.get_bool() else ('t', 'f')
    30703082        data = dict(id=42, i2=[42, 1234, None, 0, -1],
    3071                     i4=[42, 123456789, None, 0, 1, -1],
    3072                     i8=[long(42), long(123456789123456789), None,
    3073                         long(0), long(1), long(-1)],
    3074                     d=[decimal(42), long_decimal, None,
    3075                        decimal(0), decimal(1), decimal(-1), -long_decimal],
    3076                     f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
    3077                         float('inf'), float('-inf')],
    3078                     f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
    3079                         float('inf'), float('-inf')],
    3080                     m=[decimal('42.00'), odd_money, None,
    3081                        decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
    3082                     b=[t, f, t, None, f, t, None, None, t],
    3083                     v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
    3084                     t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
     3083            i4=[42, 123456789, None, 0, 1, -1],
     3084            i8=[long(42), long(123456789123456789), None,
     3085                long(0), long(1), long(-1)],
     3086            d=[decimal(42), long_decimal, None,
     3087               decimal(0), decimal(1), decimal(-1), -long_decimal],
     3088            f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
     3089                float('inf'), float('-inf')],
     3090            f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
     3091                float('inf'), float('-inf')],
     3092            m=[decimal('42.00'), odd_money, None,
     3093               decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
     3094            b=[t, f, t, None, f, t, None, None, t],
     3095            v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
     3096            t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
    30853097        r = data.copy()
    30863098        self.db.insert('arraytest', r)
     
    31033115        self.assertEqual(r['i'], [1, 2, 3])
    31043116        self.assertEqual(r['t'], ['a', 'b', 'c'])
    3105         L = pg._Literal
     3117        L = pg.Literal
    31063118        r = dict(i=L("ARRAY[1, 2, 3]"), t=L("ARRAY['a', 'b', 'c']"))
    31073119        self.db.insert('arraytest', r)
     
    32713283        if self.regtypes:
    32723284            self.assertEqual(person_typ.attnames,
    3273                              dict(name='character varying', age='smallint',
    3274                                   married='boolean', weight='real', salary='money'))
     3285                dict(name='character varying', age='smallint',
     3286                    married='boolean', weight='real', salary='money'))
    32753287        else:
    32763288            self.assertEqual(person_typ.attnames,
    3277                              dict(name='text', age='int', married='bool',
    3278                                   weight='float', salary='money'))
     3289                dict(name='text', age='int', married='bool',
     3290                    weight='float', salary='money'))
    32793291        decimal = pg.get_decimal()
    32803292        if pg.get_bool():
     
    34073419            self.assertEqual(person_typ.attnames,
    34083420                             dict(name='text', age='int'))
    3409         person = pg._Literal("('John Doe', 61)")
     3421        person = pg.Literal("('John Doe', 61)")
    34103422        r = self.db.insert('test_person', None, person=person)
    34113423        p = r['person']
     
    34613473        dbtypes = self.db.dbtypes
    34623474        self.assertIsInstance(dbtypes, dict)
     3475        self.assertNotIn('int4', dbtypes)
     3476        self.assertIs(dbtypes.get_typecast('int4'), int)
     3477        dbtypes.set_typecast('int4', float)
     3478        self.assertIs(dbtypes.get_typecast('int4'), float)
     3479        dbtypes.reset_typecast('int4')
     3480        self.assertIs(dbtypes.get_typecast('int4'), int)
     3481        dbtypes.set_typecast('int4', float)
     3482        self.assertIs(dbtypes.get_typecast('int4'), float)
     3483        dbtypes.reset_typecast()
     3484        self.assertIs(dbtypes.get_typecast('int4'), int)
    34633485        self.assertNotIn('circle', dbtypes)
    3464         cast_circle = dbtypes.get_typecast('circle')
     3486        self.assertIsNone(dbtypes.get_typecast('circle'))
    34653487        squared_circle = lambda v: 'Squared Circle: %s' % v
    34663488        dbtypes.set_typecast('circle', squared_circle)
     
    34723494            'Squared Circle: Impossible')
    34733495        dbtypes.reset_typecast('circle')
    3474         self.assertIs(dbtypes.get_typecast('circle'), cast_circle)
     3496        self.assertIsNone(dbtypes.get_typecast('circle'))
    34753497
    34763498    def testGetSetTypeCast(self):
     
    34863508        self.assertIs(get_typecast('bool'), pg.cast_bool)
    34873509        cast_circle = get_typecast('circle')
     3510        self.addCleanup(set_typecast, 'circle', cast_circle)
    34883511        squared_circle = lambda v: 'Squared Circle: %s' % v
    34893512        self.assertNotIn('circle', dbtypes)
     
    36073630
    36083631
     3632class TestDBClassAdapter(unittest.TestCase):
     3633    """Test the adapter object associatd with the DB class."""
     3634
     3635    def setUp(self):
     3636        self.db = DB()
     3637        self.adapter = self.db.adapter
     3638
     3639    def tearDown(self):
     3640        try:
     3641            self.db.close()
     3642        except pg.InternalError:
     3643            pass
     3644
     3645    def testGuessSimpleType(self):
     3646        f = self.adapter.guess_simple_type
     3647        self.assertEqual(f(pg.Bytea(b'test')), 'bytea')
     3648        self.assertEqual(f('string'), 'text')
     3649        self.assertEqual(f(b'string'), 'text')
     3650        self.assertEqual(f(True), 'bool')
     3651        self.assertEqual(f(3), 'int')
     3652        self.assertEqual(f(2.75), 'float')
     3653        self.assertEqual(f(Decimal('4.25')), 'num')
     3654        self.assertEqual(f(date(2016, 1, 30)), 'date')
     3655        self.assertEqual(f([1, 2, 3]), 'int[]')
     3656        self.assertEqual(f([[[123]]]), 'int[]')
     3657        self.assertEqual(f(['a', 'b', 'c']), 'text[]')
     3658        self.assertEqual(f([[['abc']]]), 'text[]')
     3659        self.assertEqual(f([False, True]), 'bool[]')
     3660        self.assertEqual(f([[[False]]]), 'bool[]')
     3661        r = f(('string', True, 3, 2.75, [1], [False]))
     3662        self.assertEqual(r, 'record')
     3663        self.assertEqual(list(r.attnames.values()),
     3664            ['text', 'bool', 'int', 'float', 'int[]', 'bool[]'])
     3665
     3666    def testAdaptQueryTypedList(self):
     3667        format_query = self.adapter.format_query
     3668        self.assertRaises(TypeError, format_query,
     3669            '%s,%s', (1, 2), ('int2',))
     3670        self.assertRaises(TypeError, format_query,
     3671            '%s,%s', (1,), ('int2', 'int2'))
     3672        values = (3, 7.5, 'hello', True)
     3673        types = ('int4', 'float4', 'text', 'bool')
     3674        sql, params = format_query("select %s,%s,%s,%s", values, types)
     3675        self.assertEqual(sql, 'select $1,$2,$3,$4')
     3676        self.assertEqual(params, [3, 7.5, 'hello', 't'])
     3677        types = ('bool', 'bool', 'bool', 'bool')
     3678        sql, params = format_query("select %s,%s,%s,%s", values, types)
     3679        self.assertEqual(sql, 'select $1,$2,$3,$4')
     3680        self.assertEqual(params, ['t', 't', 'f', 't'])
     3681        values = ('2016-01-30', 'current_date')
     3682        types = ('date', 'date')
     3683        sql, params = format_query("values(%s,%s)", values, types)
     3684        self.assertEqual(sql, 'values($1,current_date)')
     3685        self.assertEqual(params, ['2016-01-30'])
     3686        values = ([1, 2, 3], ['a', 'b', 'c'])
     3687        types = ('_int4', '_text')
     3688        sql, params = format_query("%s::int4[],%s::text[]", values, types)
     3689        self.assertEqual(sql, '$1::int4[],$2::text[]')
     3690        self.assertEqual(params, ['{1,2,3}', '{a,b,c}'])
     3691        types = ('_bool', '_bool')
     3692        sql, params = format_query("%s::bool[],%s::bool[]", values, types)
     3693        self.assertEqual(sql, '$1::bool[],$2::bool[]')
     3694        self.assertEqual(params, ['{t,t,t}', '{f,f,f}'])
     3695        values = [(3, 7.5, 'hello', True, [123], ['abc'])]
     3696        t = self.adapter.simple_type
     3697        typ = t('record')
     3698        typ._get_attnames = lambda _self: pg.AttrDict([
     3699            ('i', t('int')), ('f', t('float')),
     3700            ('t', t('text')), ('b', t('bool')),
     3701            ('i3', t('int[]')), ('t3', t('text[]'))])
     3702        types = [typ]
     3703        sql, params = format_query('select %s', values, types)
     3704        self.assertEqual(sql, 'select $1')
     3705        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3706
     3707    def testAdaptQueryTypedDict(self):
     3708        format_query = self.adapter.format_query
     3709        self.assertRaises(TypeError, format_query,
     3710            '%s,%s', dict(i1=1, i2=2), dict(i1='int2'))
     3711        values = dict(i=3, f=7.5, t='hello', b=True)
     3712        types = dict(i='int4', f='float4',
     3713            t='text', b='bool')
     3714        sql, params = format_query(
     3715            "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
     3716        self.assertEqual(sql, 'select $3,$2,$4,$1')
     3717        self.assertEqual(params, ['t', 7.5, 3, 'hello'])
     3718        types = dict(i='bool', f='bool',
     3719            t='bool', b='bool')
     3720        sql, params = format_query(
     3721            "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
     3722        self.assertEqual(sql, 'select $3,$2,$4,$1')
     3723        self.assertEqual(params, ['t', 't', 't', 'f'])
     3724        values = dict(d1='2016-01-30', d2='current_date')
     3725        types = dict(d1='date', d2='date')
     3726        sql, params = format_query("values(%(d1)s,%(d2)s)", values, types)
     3727        self.assertEqual(sql, 'values($1,current_date)')
     3728        self.assertEqual(params, ['2016-01-30'])
     3729        values = dict(i=[1, 2, 3], t=['a', 'b', 'c'])
     3730        types = dict(i='_int4', t='_text')
     3731        sql, params = format_query(
     3732            "%(i)s::int4[],%(t)s::text[]", values, types)
     3733        self.assertEqual(sql, '$1::int4[],$2::text[]')
     3734        self.assertEqual(params, ['{1,2,3}', '{a,b,c}'])
     3735        types = dict(i='_bool', t='_bool')
     3736        sql, params = format_query(
     3737            "%(i)s::bool[],%(t)s::bool[]", values, types)
     3738        self.assertEqual(sql, '$1::bool[],$2::bool[]')
     3739        self.assertEqual(params, ['{t,t,t}', '{f,f,f}'])
     3740        values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
     3741        t = self.adapter.simple_type
     3742        typ = t('record')
     3743        typ._get_attnames = lambda _self: pg.AttrDict([
     3744            ('i', t('int')), ('f', t('float')),
     3745            ('t', t('text')), ('b', t('bool')),
     3746            ('i3', t('int[]')), ('t3', t('text[]'))])
     3747        types = dict(record=typ)
     3748        sql, params = format_query('select %(record)s', values, types)
     3749        self.assertEqual(sql, 'select $1')
     3750        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3751
     3752    def testAdaptQueryUntypedList(self):
     3753        format_query = self.adapter.format_query
     3754        values = (3, 7.5, 'hello', True)
     3755        sql, params = format_query("select %s,%s,%s,%s", values)
     3756        self.assertEqual(sql, 'select $1,$2,$3,$4')
     3757        self.assertEqual(params, [3, 7.5, 'hello', 't'])
     3758        values = [date(2016, 1, 30), 'current_date']
     3759        sql, params = format_query("values(%s,%s)", values)
     3760        self.assertEqual(sql, 'values($1,$2)')
     3761        self.assertEqual(params, values)
     3762        values = ([1, 2, 3], ['a', 'b', 'c'], [True, False, True])
     3763        sql, params = format_query("%s,%s,%s", values)
     3764        self.assertEqual(sql, "$1,$2,$3")
     3765        self.assertEqual(params, ['{1,2,3}', '{a,b,c}', '{t,f,t}'])
     3766        values = ([[1, 2], [3, 4]], [['a', 'b'], ['c', 'd']],
     3767            [[True, False], [False, True]])
     3768        sql, params = format_query("%s,%s,%s", values)
     3769        self.assertEqual(sql, "$1,$2,$3")
     3770        self.assertEqual(params, [
     3771            '{{1,2},{3,4}}', '{{a,b},{c,d}}', '{{t,f},{f,t}}'])
     3772        values = [(3, 7.5, 'hello', True, [123], ['abc'])]
     3773        sql, params = format_query('select %s', values)
     3774        self.assertEqual(sql, 'select $1')
     3775        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3776
     3777    def testAdaptQueryUntypedDict(self):
     3778        format_query = self.adapter.format_query
     3779        values = dict(i=3, f=7.5, t='hello', b=True)
     3780        sql, params = format_query(
     3781            "select %(i)s,%(f)s,%(t)s,%(b)s", values)
     3782        self.assertEqual(sql, 'select $3,$2,$4,$1')
     3783        self.assertEqual(params, ['t', 7.5, 3, 'hello'])
     3784        values = dict(d1='2016-01-30', d2='current_date')
     3785        sql, params = format_query("values(%(d1)s,%(d2)s)", values)
     3786        self.assertEqual(sql, 'values($1,$2)')
     3787        self.assertEqual(params, [values['d1'], values['d2']])
     3788        values = dict(i=[1, 2, 3], t=['a', 'b', 'c'], b=[True, False, True])
     3789        sql, params = format_query("%(i)s,%(t)s,%(b)s", values)
     3790        self.assertEqual(sql, "$2,$3,$1")
     3791        self.assertEqual(params, ['{t,f,t}', '{1,2,3}', '{a,b,c}'])
     3792        values = dict(i=[[1, 2], [3, 4]], t=[['a', 'b'], ['c', 'd']],
     3793            b=[[True, False], [False, True]])
     3794        sql, params = format_query("%(i)s,%(t)s,%(b)s", values)
     3795        self.assertEqual(sql, "$2,$3,$1")
     3796        self.assertEqual(params, [
     3797            '{{t,f},{f,t}}', '{{1,2},{3,4}}', '{{a,b},{c,d}}'])
     3798        values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
     3799        sql, params = format_query('select %(record)s', values)
     3800        self.assertEqual(sql, 'select $1')
     3801        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3802
     3803    def testAdaptQueryInlineList(self):
     3804        format_query = self.adapter.format_query
     3805        values = (3, 7.5, 'hello', True)
     3806        sql, params = format_query("select %s,%s,%s,%s", values, inline=True)
     3807        self.assertEqual(sql, "select 3,7.5,'hello',true")
     3808        self.assertEqual(params, [])
     3809        values = [date(2016, 1, 30), 'current_date']
     3810        sql, params = format_query("values(%s,%s)", values, inline=True)
     3811        self.assertEqual(sql, "values('2016-01-30','current_date')")
     3812        self.assertEqual(params, [])
     3813        values = ([1, 2, 3], ['a', 'b', 'c'], [True, False, True])
     3814        sql, params = format_query("%s,%s,%s", values, inline=True)
     3815        self.assertEqual(sql,
     3816            "ARRAY[1,2,3],ARRAY['a','b','c'],ARRAY[true,false,true]")
     3817        self.assertEqual(params, [])
     3818        values = ([[1, 2], [3, 4]], [['a', 'b'], ['c', 'd']],
     3819            [[True, False], [False, True]])
     3820        sql, params = format_query("%s,%s,%s", values, inline=True)
     3821        self.assertEqual(sql, "ARRAY[[1,2],[3,4]],ARRAY[['a','b'],['c','d']],"
     3822            "ARRAY[[true,false],[false,true]]")
     3823        self.assertEqual(params, [])
     3824        values = [(3, 7.5, 'hello', True, [123], ['abc'])]
     3825        sql, params = format_query('select %s', values, inline=True)
     3826        self.assertEqual(sql,
     3827            "select (3,7.5,'hello',true,ARRAY[123],ARRAY['abc'])")
     3828        self.assertEqual(params, [])
     3829
     3830    def testAdaptQueryInlineDict(self):
     3831        format_query = self.adapter.format_query
     3832        values = dict(i=3, f=7.5, t='hello', b=True)
     3833        sql, params = format_query(
     3834            "select %(i)s,%(f)s,%(t)s,%(b)s", values, inline=True)
     3835        self.assertEqual(sql, "select 3,7.5,'hello',true")
     3836        self.assertEqual(params, [])
     3837        values = dict(d1='2016-01-30', d2='current_date')
     3838        sql, params = format_query(
     3839            "values(%(d1)s,%(d2)s)", values, inline=True)
     3840        self.assertEqual(sql, "values('2016-01-30','current_date')")
     3841        self.assertEqual(params, [])
     3842        values = dict(i=[1, 2, 3], t=['a', 'b', 'c'], b=[True, False, True])
     3843        sql, params = format_query("%(i)s,%(t)s,%(b)s", values, inline=True)
     3844        self.assertEqual(sql,
     3845            "ARRAY[1,2,3],ARRAY['a','b','c'],ARRAY[true,false,true]")
     3846        self.assertEqual(params, [])
     3847        values = dict(i=[[1, 2], [3, 4]], t=[['a', 'b'], ['c', 'd']],
     3848            b=[[True, False], [False, True]])
     3849        sql, params = format_query("%(i)s,%(t)s,%(b)s", values, inline=True)
     3850        self.assertEqual(sql, "ARRAY[[1,2],[3,4]],ARRAY[['a','b'],['c','d']],"
     3851            "ARRAY[[true,false],[false,true]]")
     3852        self.assertEqual(params, [])
     3853        values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
     3854        sql, params = format_query('select %(record)s', values, inline=True)
     3855        self.assertEqual(sql,
     3856            "select (3,7.5,'hello',true,ARRAY[123],ARRAY['abc'])")
     3857        self.assertEqual(params, [])
     3858
     3859    def testAdaptQueryWithPgRepr(self):
     3860        format_query = self.adapter.format_query
     3861        self.assertRaises(TypeError, format_query,
     3862            '%s', object(), inline=True)
     3863        class TestObject:
     3864            def __pg_repr__(self):
     3865                return "'adapted'"
     3866        sql, params = format_query('select %s', [TestObject()], inline=True)
     3867        self.assertEqual(sql, "select 'adapted'")
     3868        self.assertEqual(params, [])
     3869        sql, params = format_query('select %s', [[TestObject()]], inline=True)
     3870        self.assertEqual(sql, "select ARRAY['adapted']")
     3871        self.assertEqual(params, [])
     3872
     3873
    36093874class TestSchemas(unittest.TestCase):
    36103875    """Test correct handling of schemas (namespaces)."""
  • trunk/tests/test_dbapi20.py

    r798 r799  
    828828        self.assertEqual(rows, values)
    829829
     830    def test_literal(self):
     831        con = self._connect()
     832        try:
     833            cur = con.cursor()
     834            value = "lower('Hello')"
     835            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
     836            row = cur.fetchone()
     837        finally:
     838            con.close()
     839        self.assertEqual(row, (value, 'hello'))
     840
     841
    830842    def test_json(self):
    831843        inval = {"employees":
Note: See TracChangeset for help on using the changeset viewer.