Ignore:
Timestamp:
Jan 30, 2016, 2:55:18 PM (3 years ago)
Author:
cito
Message:

Port type cache and typecasting from pgdb to pg

So far, the typecasting in the classic module was been only done by
the C extension module and was not extensible through typecasting
functions in Python. This has now been made extensible by adding
a cast hook to the C extension module which has been hooked up to
a new type cache object that holds information on the types and the
associated typecast functions. All of this works very similar to the
pgdb module now, except that the basic types are still handled by
the C extension module and the Python typecast functions are only
called via the hook for types which are not supported internally.

Also added tests and a chapter on the type cache in the documentation,
and cleaned up the error messages in the C extension module.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_dbwrapper.py

    r793 r798  
    180180            'begin',
    181181            'cancel', 'clear', 'close', 'commit',
    182             'db', 'dbname', 'debug', 'decode_json', 'delete',
     182            'db', 'dbname', 'dbtypes',
     183            'debug', 'decode_json', 'delete',
    183184            'encode_json', 'end', 'endcopy', 'error',
    184185            'escape_bytea', 'escape_identifier',
     
    186187            'fileno',
    187188            'get', 'get_as_dict', 'get_as_list',
    188             'get_attnames', 'get_databases',
    189             'get_notice_receiver', 'get_parameter',
    190             'get_relations', 'get_tables',
     189            'get_attnames', 'get_cast_hook',
     190            'get_databases', 'get_notice_receiver',
     191            'get_parameter', 'get_relations', 'get_tables',
    191192            'getline', 'getlo', 'getnotify',
    192193            'has_table_privilege', 'host',
     
    200201            'release', 'reopen', 'reset', 'rollback',
    201202            'savepoint', 'server_version',
    202             'set_notice_receiver', 'set_parameter',
     203            'set_cast_hook', 'set_notice_receiver',
     204            'set_parameter',
    203205            'source', 'start', 'status',
    204206            'transaction', 'truncate',
     
    387389        db.query("drop table if exists test cascade")
    388390        db.query("create table test ("
    389             "i2 smallint, i4 integer, i8 bigint,"
    390             " d numeric, f4 real, f8 double precision, m money,"
    391             " v4 varchar(4), c4 char(4), t text)")
     391                 "i2 smallint, i4 integer, i8 bigint,"
     392                 " d numeric, f4 real, f8 double precision, m money,"
     393                 " v4 varchar(4), c4 char(4), t text)")
    392394        db.query("create or replace view test_view as"
    393             " select i4, v4 from test")
     395                 " select i4, v4 from test")
    394396        db.close()
    395397        cls.cls_set_up = True
     
    420422
    421423    def createTable(self, table, definition,
    422             temporary=True, oids=None, values=None):
     424                    temporary=True, oids=None, values=None):
    423425        query = self.db.query
    424426        if not '"' in table or '.' in table:
     
    470472        self.assertEqual(r, u"'that''s kÀse'")
    471473        self.assertEqual(f(r"It's fine to have a \ inside."),
    472             r" E'It''s fine to have a \\ inside.'")
     474                         r" E'It''s fine to have a \\ inside.'")
    473475        self.assertEqual(f('No "quotes" must be escaped.'),
    474             "'No \"quotes\" must be escaped.'")
     476                         "'No \"quotes\" must be escaped.'")
    475477
    476478    def testEscapeIdentifier(self):
     
    489491        self.assertEqual(r, u'"that\'s kÀse"')
    490492        self.assertEqual(f(r"It's fine to have a \ inside."),
    491             '"It\'s fine to have a \\ inside."')
     493                         '"It\'s fine to have a \\ inside."')
    492494        self.assertEqual(f('All "quotes" must be escaped.'),
    493             '"All ""quotes"" must be escaped."')
     495                         '"All ""quotes"" must be escaped."')
    494496
    495497    def testEscapeString(self):
     
    508510        self.assertEqual(r, u"that''s kÀse")
    509511        self.assertEqual(f(r"It's fine to have a \ inside."),
    510             r"It''s fine to have a \ inside.")
     512                         r"It''s fine to have a \ inside.")
    511513
    512514    def testEscapeBytea(self):
     
    545547        self.assertEqual(f(r'\\x706c61696e'), b'\\x706c61696e')
    546548        self.assertEqual(f(r'\\x746861742773206be47365'),
    547             b'\\x746861742773206be47365')
     549                         b'\\x746861742773206be47365')
    548550        self.assertEqual(f(r'\\x4f007073ff21'), b'\\x4f007073ff21')
    549551
     
    552554        self.assertIsNone(f('null'))
    553555        data = {
    554           "id": 1, "name": "Foo", "price": 1234.5,
    555           "new": True, "note": None,
    556           "tags": ["Bar", "Eek"],
    557           "stock": {"warehouse": 300, "retail": 20}}
     556            "id": 1, "name": "Foo", "price": 1234.5,
     557            "new": True, "note": None,
     558            "tags": ["Bar", "Eek"],
     559            "stock": {"warehouse": 300, "retail": 20}}
    558560        text = json.dumps(data)
    559561        r = f(text)
     
    571573        self.assertEqual(f(None), 'null')
    572574        data = {
    573           "id": 1, "name": "Foo", "price": 1234.5,
    574           "new": True, "note": None,
    575           "tags": ["Bar", "Eek"],
    576           "stock": {"warehouse": 300, "retail": 20}}
     575            "id": 1, "name": "Foo", "price": 1234.5,
     576            "new": True, "note": None,
     577            "tags": ["Bar", "Eek"],
     578            "stock": {"warehouse": 300, "retail": 20}}
    577579        text = json.dumps(data)
    578580        r = f(data)
     
    666668        self.assertEqual(g('standard_conforming_strings'), 'on')
    667669        self.assertRaises(ValueError, f, set([ 'default_with_oids',
    668             'standard_conforming_strings']), ['off', 'on'])
     670                                               'standard_conforming_strings']), ['off', 'on'])
    669671        f(set(['default_with_oids', 'standard_conforming_strings']),
    670             ['off', 'off'])
     672          ['off', 'off'])
    671673        self.assertEqual(g('default_with_oids'), 'off')
    672674        self.assertEqual(g('standard_conforming_strings'), 'off')
     
    797799        values = [(2, "World!"), (1, "Hello")]
    798800        self.createTable(table, "n smallint, t varchar",
    799             temporary=True, oids=True, values=values)
     801                         temporary=True, oids=True, values=values)
    800802        r = self.db.query('select t from "%s" order by n' % table).getresult()
    801803        r = ', '.join(row[0] for row in r)
     
    853855        q = "select * from test_table order by 1, 2"
    854856        self.assertEqual(query(q).getresult(),
    855             [(1, 2), (3, 4), (5, 6)])
     857                         [(1, 2), (3, 4), (5, 6)])
    856858        q = "select * from test_table where n1=$1 and n2=$2"
    857859        self.assertEqual(query(q, 3, 4).getresult(), [(3, 4)])
     
    861863        q = "select * from test_table order by 1, 2"
    862864        self.assertEqual(query(q).getresult(),
    863             [(1, 2), (3, 7), (5, 6)])
     865                         [(1, 2), (3, 7), (5, 6)])
    864866        q = "delete from test_table where n2!=$1"
    865867        r = query(q, 4)
     
    883885            self.createTable('%s1' % t, 'b smallint primary key')
    884886            self.createTable('%s2' % t,
    885                 'c smallint, d smallint primary key')
     887                             'c smallint, d smallint primary key')
    886888            self.createTable('%s3' % t,
    887                 'e smallint, f smallint, g smallint, h smallint, i smallint,'
    888                 ' primary key (f, h)')
     889                             'e smallint, f smallint, g smallint, h smallint, i smallint,'
     890                             ' primary key (f, h)')
    889891            self.createTable('%s4' % t,
    890                 'e smallint, f smallint, g smallint, h smallint, i smallint,'
    891                 ' primary key (h, f)')
     892                             'e smallint, f smallint, g smallint, h smallint, i smallint,'
     893                             ' primary key (h, f)')
    892894            self.createTable('%s5' % t,
    893                 'more_than_one_letter varchar primary key')
     895                             'more_than_one_letter varchar primary key')
    894896            self.createTable('%s6' % t,
    895                 '"with space" date primary key')
     897                             '"with space" date primary key')
    896898            self.createTable('%s7' % t,
    897                 'a_very_long_column_name varchar, "with space" date, "42" int,'
    898                 ' primary key (a_very_long_column_name, "with space", "42")')
     899                             'a_very_long_column_name varchar, "with space" date, "42" int,'
     900                             ' primary key (a_very_long_column_name, "with space", "42")')
    899901            self.assertRaises(KeyError, pkey, '%s0' % t)
    900902            self.assertEqual(pkey('%s1' % t), 'b')
     
    940942        get_tables = self.db.get_tables
    941943        tables = ('A very Special Name', 'A_MiXeD_quoted_NaMe',
    942             'Hello, Test World!', 'Zoro', 'a1', 'a2', 'a321',
    943             'averyveryveryveryveryveryveryreallyreallylongtablename',
    944             'b0', 'b3', 'x', 'xXx', 'xx', 'y', 'z')
     944                  'Hello, Test World!', 'Zoro', 'a1', 'a2', 'a321',
     945                  'averyveryveryveryveryveryveryreallyreallylongtablename',
     946                  'b0', 'b3', 'x', 'xXx', 'xx', 'y', 'z')
    945947        for t in tables:
    946948            self.db.query('drop table if exists "%s" cascade' % t)
     
    987989        get_attnames = self.db.get_attnames
    988990        self.assertRaises(pg.ProgrammingError,
    989             self.db.get_attnames, 'does_not_exist')
     991                          self.db.get_attnames, 'does_not_exist')
    990992        self.assertRaises(pg.ProgrammingError,
    991             self.db.get_attnames, 'has.too.many.dots')
     993                          self.db.get_attnames, 'has.too.many.dots')
    992994        r = get_attnames('test')
    993995        self.assertIsInstance(r, dict)
     
    10031005                v4='text', c4='text', t='text'))
    10041006        self.createTable('test_table',
    1005             'n int, alpha smallint, beta bool,'
    1006             ' gamma char(5), tau text, v varchar(3)')
     1007                         'n int, alpha smallint, beta bool,'
     1008                         ' gamma char(5), tau text, v varchar(3)')
    10071009        r = get_attnames('test_table')
    10081010        self.assertIsInstance(r, dict)
     
    10201022        table = 'test table for get_attnames()'
    10211023        self.createTable(table,
    1022             '"Prime!" smallint, "much space" integer, "Questions?" text')
     1024                         '"Prime!" smallint, "much space" integer, "Questions?" text')
    10231025        r = get_attnames(table)
    10241026        self.assertIsInstance(r, dict)
     
    10321034        table = 'yet another test table for get_attnames()'
    10331035        self.createTable(table,
    1034             'a smallint, b integer, c bigint,'
    1035             ' e numeric, f real, f2 double precision, m money,'
    1036             ' x smallint, y smallint, z smallint,'
    1037             ' Normal_NaMe smallint, "Special Name" smallint,'
    1038             ' t text, u char(2), v varchar(2),'
    1039             ' primary key (y, u)', oids=True)
     1036                         'a smallint, b integer, c bigint,'
     1037                         ' e numeric, f real, f2 double precision, m money,'
     1038                         ' x smallint, y smallint, z smallint,'
     1039                         ' Normal_NaMe smallint, "Special Name" smallint,'
     1040                         ' t text, u char(2), v varchar(2),'
     1041                         ' primary key (y, u)', oids=True)
    10401042        r = get_attnames(table)
    10411043        self.assertIsInstance(r, dict)
     
    10501052        else:
    10511053            self.assertEqual(r, {'a': 'int', 'b': 'int', 'c': 'int',
    1052                 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
    1053                 'normal_name': 'int', 'Special Name': 'int',
    1054                 'u': 'text', 't': 'text', 'v': 'text',
    1055                 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
     1054                                 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
     1055                                 'normal_name': 'int', 'Special Name': 'int',
     1056                                 'u': 'text', 't': 'text', 'v': 'text',
     1057                                 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
    10561058
    10571059    def testGetAttnamesWithRegtypes(self):
    10581060        get_attnames = self.db.get_attnames
    10591061        self.createTable('test_table',
    1060             ' n int, alpha smallint, beta bool,'
    1061             ' gamma char(5), tau text, v varchar(3)')
     1062                         ' n int, alpha smallint, beta bool,'
     1063                         ' gamma char(5), tau text, v varchar(3)')
    10621064        use_regtypes = self.db.use_regtypes
    10631065        regtypes = use_regtypes()
     
    10761078        get_attnames = self.db.get_attnames
    10771079        self.createTable('test_table',
    1078             ' n int, alpha smallint, beta bool,'
    1079             ' gamma char(5), tau text, v varchar(3)')
     1080                         ' n int, alpha smallint, beta bool,'
     1081                         ' gamma char(5), tau text, v varchar(3)')
    10801082        use_regtypes = self.db.use_regtypes
    10811083        regtypes = use_regtypes()
     
    11371139        table = 'test table for get_attnames'
    11381140        self.createTable(table,
    1139             ' n int, alpha smallint, v varchar(3),'
    1140             ' gamma char(5), tau text, beta bool')
     1141                         ' n int, alpha smallint, v varchar(3),'
     1142                         ' gamma char(5), tau text, beta bool')
    11411143        r = get_attnames(table)
    11421144        self.assertIsInstance(r, OrderedDict)
     
    11761178        table = 'test table for get_attnames'
    11771179        self.createTable(table,
    1178             ' n int, alpha smallint, v varchar(3),'
    1179             ' gamma char(5), tau text, beta bool')
     1180                         ' n int, alpha smallint, v varchar(3),'
     1181                         ' gamma char(5), tau text, beta bool')
    11801182        r = get_attnames(table)
    11811183        self.assertIsInstance(r, AttrDict)
     
    12131215        self.assertRaises(TypeError, get, table)
    12141216        self.createTable(table, 'n integer, t text',
    1215             values=enumerate('xyz', start=1))
     1217                         values=enumerate('xyz', start=1))
    12161218        self.assertRaises(pg.ProgrammingError, get, table, 2)
    12171219        r = get(table, 2, 'n')
     
    12661268        table = 'get_with_oid_test_table'
    12671269        self.createTable(table, 'n integer, t text', oids=True,
    1268             values=enumerate('xyz', start=1))
     1270                         values=enumerate('xyz', start=1))
    12691271        self.assertRaises(pg.ProgrammingError, get, table, 2)
    12701272        self.assertRaises(KeyError, get, table, {}, 'oid')
     
    13301332        table = 'get_test_table_1'
    13311333        self.createTable(table, 'n integer primary key, t text',
    1332             values=enumerate('abc', start=1))
     1334                         values=enumerate('abc', start=1))
    13331335        self.assertEqual(get(table, 2)['t'], 'b')
    13341336        self.assertEqual(get(table, 1, 'n')['t'], 'a')
     
    13411343        table = 'get_test_table_2'
    13421344        self.createTable(table,
    1343             'n integer, m integer, t text, primary key (n, m)',
    1344             values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
    1345                 for n in range(3) for m in range(2)])
     1345                         'n integer, m integer, t text, primary key (n, m)',
     1346                         values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     1347                                 for n in range(3) for m in range(2)])
    13461348        self.assertRaises(KeyError, get, table, 2)
    13471349        self.assertEqual(get(table, (1, 1))['t'], 'a')
     
    13621364        table = 'test table for get()'
    13631365        self.createTable(table, '"Prime!" smallint primary key,'
    1364             ' "much space" integer, "Questions?" text',
    1365             values=[(17, 1001, 'No!')])
     1366                                ' "much space" integer, "Questions?" text',
     1367                         values=[(17, 1001, 'No!')])
    13661368        r = get(table, 17)
    13671369        self.assertIsInstance(r, dict)
     
    13731375        self.db.query('delete from test where i4=14')
    13741376        self.db.query('insert into test (i4, v4) values('
    1375             "14, 'abc4')")
     1377                      "14, 'abc4')")
    13761378        r = self.db.get('test_view', 14, 'i4')
    13771379        self.assertIn('v4', r)
     
    13821384        query = self.db.query
    13831385        self.createTable('test_students',
    1384             'firstname varchar primary key, nickname varchar, grade char(2)',
    1385             values=[("D'Arcy", 'Darcey', 'A+'), ('Sheldon', 'Moonpie', 'A+'),
    1386                     ('Robert', 'Little Bobby Tables', 'D-')])
     1386                         'firstname varchar primary key, nickname varchar, grade char(2)',
     1387                         values=[("D'Arcy", 'Darcey', 'A+'), ('Sheldon', 'Moonpie', 'A+'),
     1388                                 ('Robert', 'Little Bobby Tables', 'D-')])
    13871389        r = get('test_students', 'Sheldon')
    13881390        self.assertEqual(r, dict(
     
    13981400        except pg.DatabaseError as error:
    13991401            self.assertEqual(str(error),
    1400                 'No such record in test_students\nwhere "firstname" = $1\n'
    1401                 'with $1="D\' Arcy"')
     1402                             'No such record in test_students\nwhere "firstname" = $1\n'
     1403                             'with $1="D\' Arcy"')
    14021404        try:
    14031405            get('test_students', "Robert'); TRUNCATE TABLE test_students;--")
    14041406        except pg.DatabaseError as error:
    14051407            self.assertEqual(str(error),
    1406                 'No such record in test_students\nwhere "firstname" = $1\n'
    1407                 'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
     1408                             'No such record in test_students\nwhere "firstname" = $1\n'
     1409                             'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
    14081410        q = "select * from test_students order by 1 limit 4"
    14091411        r = query(q).getresult()
     
    14181420        table = 'insert_test_table'
    14191421        self.createTable(table,
    1420             'i2 smallint, i4 integer, i8 bigint,'
    1421             ' d numeric, f4 real, f8 double precision, m money,'
    1422             ' v4 varchar(4), c4 char(4), t text,'
    1423             ' b boolean, ts timestamp', oids=True)
     1422                         'i2 smallint, i4 integer, i8 bigint,'
     1423                         ' d numeric, f4 real, f8 double precision, m money,'
     1424                         ' v4 varchar(4), c4 char(4), t text,'
     1425                         ' b boolean, ts timestamp', oids=True)
    14241426        oid_table = 'oid(%s)' % table
    14251427        tests = [dict(i2=None, i4=None, i8=None),
    1426             (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
    1427             (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
    1428             dict(i2=42, i4=123456, i8=9876543210),
    1429             dict(i2=2 ** 15 - 1,
    1430                 i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
    1431             dict(d=None), (dict(d=''), dict(d=None)),
    1432             dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
    1433             dict(f4=None, f8=None), dict(f4=0, f8=0),
    1434             (dict(f4='', f8=''), dict(f4=None, f8=None)),
    1435             (dict(d=1234.5, f4=1234.5, f8=1234.5),
     1428                 (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
     1429                 (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
     1430                 dict(i2=42, i4=123456, i8=9876543210),
     1431                 dict(i2=2 ** 15 - 1,
     1432                      i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
     1433                 dict(d=None), (dict(d=''), dict(d=None)),
     1434                 dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
     1435                 dict(f4=None, f8=None), dict(f4=0, f8=0),
     1436                 (dict(f4='', f8=''), dict(f4=None, f8=None)),
     1437                 (dict(d=1234.5, f4=1234.5, f8=1234.5),
    14361438                  dict(d=Decimal('1234.5'))),
    1437             dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
    1438             dict(d=Decimal('123456789.9876543212345678987654321')),
    1439             dict(m=None), (dict(m=''), dict(m=None)),
    1440             dict(m=Decimal('-1234.56')),
    1441             (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
    1442             dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
    1443             (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
    1444             (dict(m=1234.5), dict(m=Decimal('1234.5'))),
    1445             (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
    1446             (dict(m=123456), dict(m=Decimal('123456'))),
    1447             (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
    1448             dict(b=None), (dict(b=''), dict(b=None)),
    1449             dict(b='f'), dict(b='t'),
    1450             (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
    1451             (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
    1452             (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
    1453             (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
    1454             (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
    1455             (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
    1456             dict(v4=None, c4=None, t=None),
    1457             (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
    1458             dict(v4='1234', c4='1234', t='1234' * 10),
    1459             dict(v4='abcd', c4='abcd', t='abcdefg'),
    1460             (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
    1461             dict(ts=None), (dict(ts=''), dict(ts=None)),
    1462             (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
    1463             dict(ts='2012-12-21 00:00:00'),
    1464             (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
    1465             dict(ts='2012-12-21 12:21:12'),
    1466             dict(ts='2013-01-05 12:13:14'),
    1467             dict(ts='current_timestamp')]
     1439                 dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
     1440                 dict(d=Decimal('123456789.9876543212345678987654321')),
     1441                 dict(m=None), (dict(m=''), dict(m=None)),
     1442                 dict(m=Decimal('-1234.56')),
     1443                 (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
     1444                 dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
     1445                 (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
     1446                 (dict(m=1234.5), dict(m=Decimal('1234.5'))),
     1447                 (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
     1448                 (dict(m=123456), dict(m=Decimal('123456'))),
     1449                 (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
     1450                 dict(b=None), (dict(b=''), dict(b=None)),
     1451                 dict(b='f'), dict(b='t'),
     1452                 (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
     1453                 (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
     1454                 (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
     1455                 (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
     1456                 (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
     1457                 (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
     1458                 dict(v4=None, c4=None, t=None),
     1459                 (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
     1460                 dict(v4='1234', c4='1234', t='1234' * 10),
     1461                 dict(v4='abcd', c4='abcd', t='abcdefg'),
     1462                 (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
     1463                 dict(ts=None), (dict(ts=''), dict(ts=None)),
     1464                 (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
     1465                 dict(ts='2012-12-21 00:00:00'),
     1466                 (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
     1467                 dict(ts='2012-12-21 12:21:12'),
     1468                 dict(ts='2013-01-05 12:13:14'),
     1469                 dict(ts='current_timestamp')]
    14681470        for test in tests:
    14691471            if isinstance(test, dict):
     
    14901492            self.assertIsInstance(oid, int)
    14911493            data = dict(item for item in data.items()
    1492                 if item[0] in expect)
     1494                        if item[0] in expect)
    14931495            ts = expect.get('ts')
    14941496            if ts == 'current_timestamp':
     
    15091511            self.assertEqual(data['oid'], oid)
    15101512            data = dict(item for item in data.items()
    1511                 if item[0] in expect)
     1513                        if item[0] in expect)
    15121514            self.assertEqual(data, expect)
    15131515            query('delete from "%s"' % table)
     
    15861588        table = 'test table for insert()'
    15871589        self.createTable(table, '"Prime!" smallint primary key,'
    1588             ' "much space" integer, "Questions?" text')
     1590                                ' "much space" integer, "Questions?" text')
    15891591        r = {'Prime!': 11, 'much space': 2002, 'Questions?': 'What?'}
    15901592        r = insert(table, r)
     
    16361638        query = self.db.query
    16371639        self.assertRaises(pg.ProgrammingError, update,
    1638             'test', i2=2, i4=4, i8=8)
     1640                          'test', i2=2, i4=4, i8=8)
    16391641        table = 'update_test_table'
    16401642        self.createTable(table, 'n integer, t text', oids=True,
    1641             values=enumerate('xyz', start=1))
     1643                         values=enumerate('xyz', start=1))
    16421644        self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
    16431645        r = self.db.get(table, 2, 'n')
     
    16841686        query("insert into test_table values (1)")
    16851687        self.assertRaises(pg.ProgrammingError,
    1686             update, 'test_table', dict(oid=oid, n=4))
     1688                          update, 'test_table', dict(oid=oid, n=4))
    16871689        r = update('test_table', dict(n=4), oid=oid)
    16881690        self.assertEqual(r['n'], 4)
     
    17241726        query = self.db.query
    17251727        self.assertRaises(pg.ProgrammingError, update,
    1726             'test', i2=2, i4=4, i8=8)
     1728                          'test', i2=2, i4=4, i8=8)
    17271729        table = 'update_test_table'
    17281730        self.createTable(table, 'n integer primary key, t text', oids=False,
    1729             values=enumerate('xyz', start=1))
     1731                         values=enumerate('xyz', start=1))
    17301732        r = self.db.get(table, 2)
    17311733        r['t'] = 'u'
     
    17411743        table = 'update_test_table_1'
    17421744        self.createTable(table, 'n integer primary key, t text',
    1743             values=enumerate('abc', start=1))
     1745                         values=enumerate('abc', start=1))
    17441746        self.assertRaises(KeyError, update, table, dict(t='b'))
    17451747        s = dict(n=2, t='d')
     
    17641766        table = 'update_test_table_2'
    17651767        self.createTable(table,
    1766             'n integer, m integer, t text, primary key (n, m)',
    1767             values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
    1768                 for n in range(3) for m in range(2)])
     1768                         'n integer, m integer, t text, primary key (n, m)',
     1769                         values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     1770                                 for n in range(3) for m in range(2)])
    17691771        self.assertRaises(KeyError, update, table, dict(n=2, t='b'))
    17701772        self.assertEqual(update(table,
    1771             dict(n=2, m=2, t='x'))['t'], 'x')
     1773                                dict(n=2, m=2, t='x'))['t'], 'x')
    17721774        q = 'select t from "%s" where n=2 order by m' % table
    17731775        r = [r[0] for r in query(q).getresult()]
     
    17791781        table = 'test table for update()'
    17801782        self.createTable(table, '"Prime!" smallint primary key,'
    1781             ' "much space" integer, "Questions?" text',
    1782             values=[(13, 3003, 'Why!')])
     1783                                ' "much space" integer, "Questions?" text',
     1784                         values=[(13, 3003, 'Why!')])
    17831785        r = {'Prime!': 13, 'much space': 7007, 'Questions?': 'When?'}
    17841786        r = update(table, r)
     
    17981800        query = self.db.query
    17991801        self.assertRaises(pg.ProgrammingError, upsert,
    1800             'test', i2=2, i4=4, i8=8)
     1802                          'test', i2=2, i4=4, i8=8)
    18011803        table = 'upsert_test_table'
    18021804        self.createTable(table, 'n integer primary key, t text', oids=True)
     
    18721874        self.createTable('test_table', 'n int', oids=True, values=[1])
    18731875        self.assertRaises(pg.ProgrammingError,
    1874             upsert, 'test_table', dict(n=2))
     1876                          upsert, 'test_table', dict(n=2))
    18751877        r = get('test_table', 1, 'n')
    18761878        self.assertIsInstance(r, dict)
     
    18811883        oid = r[qoid]
    18821884        self.assertRaises(pg.ProgrammingError,
    1883             upsert, 'test_table', dict(n=2, oid=oid))
     1885                          upsert, 'test_table', dict(n=2, oid=oid))
    18841886        query("alter table test_table add column m int")
    18851887        query("alter table test_table add primary key (n)")
     
    19551957        table = 'upsert_test_table_2'
    19561958        self.createTable(table,
    1957             'n integer, m integer, t text, primary key (n, m)')
     1959                         'n integer, m integer, t text, primary key (n, m)')
    19581960        s = dict(n=1, m=2, t='x')
    19591961        try:
     
    20222024        table = 'test table for upsert()'
    20232025        self.createTable(table, '"Prime!" smallint primary key,'
    2024             ' "much space" integer, "Questions?" text')
     2026                                ' "much space" integer, "Questions?" text')
    20252027        s = {'Prime!': 31, 'much space': 9009, 'Questions?': 'Yes.'}
    20262028        try:
     
    20552057        table = 'clear_test_table'
    20562058        self.createTable(table,
    2057             'n integer, f float, b boolean, d date, t text', oids=True)
     2059                         'n integer, f float, b boolean, d date, t text', oids=True)
    20582060        r = clear(table)
    20592061        result = dict(n=0, f=0, b=f, d='', t='')
     
    20712073        table = 'test table for clear()'
    20722074        self.createTable(table, '"Prime!" smallint primary key,'
    2073             ' "much space" integer, "Questions?" text')
     2075                                ' "much space" integer, "Questions?" text')
    20742076        r = clear(table)
    20752077        self.assertIsInstance(r, dict)
     
    20822084        query = self.db.query
    20832085        self.assertRaises(pg.ProgrammingError, delete,
    2084             'test', dict(i2=2, i4=4, i8=8))
     2086                          'test', dict(i2=2, i4=4, i8=8))
    20852087        table = 'delete_test_table'
    20862088        self.createTable(table, 'n integer, t text', oids=True,
    2087             values=enumerate('xyz', start=1))
     2089                         values=enumerate('xyz', start=1))
    20882090        self.assertRaises(pg.ProgrammingError, self.db.get, table, 2)
    20892091        r = self.db.get(table, 1, 'n')
     
    22042206        table = 'delete_test_table_1'
    22052207        self.createTable(table, 'n integer primary key, t text',
    2206             values=enumerate('abc', start=1))
     2208                         values=enumerate('abc', start=1))
    22072209        self.assertRaises(KeyError, self.db.delete, table, dict(t='b'))
    22082210        self.assertEqual(self.db.delete(table, dict(n=2)), 1)
     
    22142216        table = 'delete_test_table_2'
    22152217        self.createTable(table,
    2216             'n integer, m integer, t text, primary key (n, m)',
    2217             values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
    2218                 for n in range(3) for m in range(2)])
     2218                         'n integer, m integer, t text, primary key (n, m)',
     2219                         values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     2220                                 for n in range(3) for m in range(2)])
    22192221        self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
    22202222        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
    22212223        r = [r[0] for r in query('select t from "%s" where n=2'
    2222             ' order by m' % table).getresult()]
     2224                                 ' order by m' % table).getresult()]
    22232225        self.assertEqual(r, ['c'])
    22242226        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 0)
    22252227        r = [r[0] for r in query('select t from "%s" where n=3'
    2226             ' order by m' % table).getresult()]
     2228                                 ' order by m' % table).getresult()]
    22272229        self.assertEqual(r, ['e', 'f'])
    22282230        self.assertEqual(self.db.delete(table, dict(n=3, m=1)), 1)
    22292231        r = [r[0] for r in query('select t from "%s" where n=3'
    2230             ' order by m' % table).getresult()]
     2232                                 ' order by m' % table).getresult()]
    22312233        self.assertEqual(r, ['f'])
    22322234
     
    22362238        table = 'test table for delete()'
    22372239        self.createTable(table, '"Prime!" smallint primary key,'
    2238             ' "much space" integer, "Questions?" text',
    2239             values=[(19, 5005, 'Yes!')])
     2240                                ' "much space" integer, "Questions?" text',
     2241                         values=[(19, 5005, 'Yes!')])
    22402242        r = {'Prime!': 17}
    22412243        r = delete(table, r)
     
    22532255        query = self.db.query
    22542256        self.createTable('test_parent',
    2255             'n smallint primary key', values=range(3))
     2257                         'n smallint primary key', values=range(3))
    22562258        self.createTable('test_child',
    2257             'n smallint primary key references test_parent', values=range(3))
     2259                         'n smallint primary key references test_parent', values=range(3))
    22582260        q = ("select (select count(*) from test_parent),"
    2259             " (select count(*) from test_child)")
     2261             " (select count(*) from test_child)")
    22602262        self.assertEqual(query(q).getresult()[0], (3, 3))
    22612263        self.assertRaises(pg.ProgrammingError,
    2262             delete, 'test_parent', None, n=2)
     2264                          delete, 'test_parent', None, n=2)
    22632265        self.assertRaises(pg.ProgrammingError,
    2264             delete, 'test_parent *', None, n=2)
     2266                          delete, 'test_parent *', None, n=2)
    22652267        r = delete('test_child', None, n=2)
    22662268        self.assertEqual(r, 1)
     
    22702272        self.assertEqual(query(q).getresult()[0], (2, 2))
    22712273        self.assertRaises(pg.ProgrammingError,
    2272             delete, 'test_parent', dict(n=0))
     2274                          delete, 'test_parent', dict(n=0))
    22732275        self.assertRaises(pg.ProgrammingError,
    2274             delete, 'test_parent *', dict(n=0))
     2276                          delete, 'test_parent *', dict(n=0))
    22752277        r = delete('test_child', dict(n=0))
    22762278        self.assertEqual(r, 1)
     
    22932295        query = self.db.query
    22942296        self.createTable('test_table', 'n smallint',
    2295             temporary=False, values=[1] * 3)
     2297                         temporary=False, values=[1] * 3)
    22962298        q = "select count(*) from test_table"
    22972299        r = query(q).getresult()[0][0]
     
    23132315                query("insert into test_table_2 values (2)")
    23142316            q = ("select (select count(*) from test_table),"
    2315                 " (select count(*) from test_table_2)")
     2317                 " (select count(*) from test_table_2)")
    23162318            r = query(q).getresult()[0]
    23172319            self.assertEqual(r, (3, 3))
     
    23502352        query = self.db.query
    23512353        self.createTable('test_parent', 'n smallint primary key',
    2352             values=range(3))
     2354                         values=range(3))
    23532355        self.createTable('test_child',
    2354             'n smallint primary key references test_parent (n)',
    2355             values=range(3))
     2356                         'n smallint primary key references test_parent (n)',
     2357                         values=range(3))
    23562358        q = ("select (select count(*) from test_parent),"
    2357             " (select count(*) from test_child)")
     2359             " (select count(*) from test_child)")
    23582360        r = query(q).getresult()[0]
    23592361        self.assertEqual(r, (3, 3))
     
    23932395            query("insert into test_child (n, m) values (2, 3)")
    23942396        q = ("select (select count(*) from test_parent),"
    2395             " (select count(*) from test_child)")
     2397             " (select count(*) from test_child)")
    23962398        r = query(q).getresult()[0]
    23972399        self.assertEqual(r, (6, 3))
     
    24272429                query("insert into test_child%s (n, m) values (2, 3)" % t)
    24282430        q = ("select (select count(*) from test_parent),"
    2429             " (select count(*) from test_child),"
    2430             " (select count(*) from test_parent_2),"
    2431             " (select count(*) from test_child_2)")
     2431             " (select count(*) from test_child),"
     2432             " (select count(*) from test_parent_2),"
     2433             " (select count(*) from test_child_2)")
    24322434        r = query(q).getresult()[0]
    24332435        self.assertEqual(r, (6, 3, 6, 3))
     
    24392441        self.assertEqual(r, (0, 0, 0, 0))
    24402442        self.assertRaises(ValueError, truncate,
    2441             ['test_parent*', 'test_child'], only=[True, False])
     2443                          ['test_parent*', 'test_child'], only=[True, False])
    24422444        truncate(['test_parent*', 'test_child'], only=[False, True])
    24432445
     
    24712473        named = hasattr(r, 'colname')
    24722474        names = [(1, 'Homer'), (2, 'Marge'),
    2473                 (3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
     2475                 (3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
    24742476        self.createTable(table,
    2475             'id smallint primary key, name varchar', values=names)
     2477                         'id smallint primary key, name varchar', values=names)
    24762478        r = get_as_list(table)
    24772479        self.assertIsInstance(r, list)
     
    25872589                  (3, '#b2ffff', 'Celeste'), (4, '#c19a6b', 'Desert')]
    25882590        self.createTable(table,
    2589             'id smallint primary key, rgb char(7), name varchar',
    2590             values=colors)
     2591                         'id smallint primary key, rgb char(7), name varchar',
     2592                         values=colors)
    25912593        # keyname must be string, list or tuple
    25922594        self.assertRaises(KeyError, get_as_dict, table, 3)
     
    26152617        self.assertIsInstance(r, OrderedDict)
    26162618        expected = OrderedDict((row[1], (row[0], row[2]))
    2617             for row in sorted(colors, key=itemgetter(1)))
     2619                               for row in sorted(colors, key=itemgetter(1)))
    26182620        self.assertEqual(r, expected)
    26192621        for key in r:
     
    26662668        self.assertIsInstance(r, OrderedDict)
    26672669        expected = OrderedDict((row[1], row[2])
    2668             for row in sorted(colors, key=itemgetter(1)))
     2670                               for row in sorted(colors, key=itemgetter(1)))
    26692671        self.assertEqual(r, expected)
    26702672        for key in r:
     
    26922694        r = get_as_dict(table, what=['name', 'id'],
    26932695                        where=['id > 1', 'id < 4', "rgb like '#b%'",
    2694                    "name not like 'A%'", "name not like '%t'"], scalar=True)
     2696                               "name not like 'A%'", "name not like '%t'"], scalar=True)
    26952697        self.assertEqual(r, expected)
    26962698        r = get_as_dict(table, what='name, id', limit=2, offset=1, scalar=True)
     
    27732775        self.db.begin(mode='read only')
    27742776        self.assertRaises(pg.ProgrammingError,
    2775             query, "insert into test_table values (0)")
     2777                          query, "insert into test_table values (0)")
    27762778        self.db.rollback()
    27772779        self.db.start(mode='Read Only')
    27782780        self.assertRaises(pg.ProgrammingError,
    2779             query, "insert into test_table values (0)")
     2781                          query, "insert into test_table values (0)")
    27802782        self.db.abort()
    27812783
     
    29322934        # insert JSON object
    29332935        data = {
    2934           "id": 1, "name": "Foo", "price": 1234.5,
    2935           "new": True, "note": None,
    2936           "tags": ["Bar", "Eek"],
    2937           "stock": {"warehouse": 300, "retail": 20}}
     2936            "id": 1, "name": "Foo", "price": 1234.5,
     2937            "new": True, "note": None,
     2938            "tags": ["Bar", "Eek"],
     2939            "stock": {"warehouse": 300, "retail": 20}}
    29382940        r = self.db.insert('json_test', n=1, data=data)
    29392941        self.assertIsInstance(r, dict)
     
    29812983        try:
    29822984            self.createTable('jsonb_test',
    2983                 'n smallint primary key, data jsonb')
     2985                             'n smallint primary key, data jsonb')
    29842986        except pg.ProgrammingError as error:
    29852987            if self.db.server_version < 90400:
     
    30023004        # insert JSON object
    30033005        data = {
    3004           "id": 1, "name": "Foo", "price": 1234.5,
    3005           "new": True, "note": None,
    3006           "tags": ["Bar", "Eek"],
    3007           "stock": {"warehouse": 300, "retail": 20}}
     3006            "id": 1, "name": "Foo", "price": 1234.5,
     3007            "new": True, "note": None,
     3008            "tags": ["Bar", "Eek"],
     3009            "stock": {"warehouse": 300, "retail": 20}}
    30083010        r = self.db.insert('jsonb_test', n=1, data=data)
    30093011        self.assertIsInstance(r, dict)
     
    30433045    def testArray(self):
    30443046        self.createTable('arraytest',
    3045             'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
    3046             ' d numeric[], f4 real[], f8 double precision[], m money[],'
    3047             ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
     3047                         'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
     3048                         ' d numeric[], f4 real[], f8 double precision[], m money[],'
     3049                         ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
    30483050        r = self.db.get_attnames('arraytest')
    30493051        if self.regtypes:
     
    30673069        t, f = (True, False) if pg.get_bool() else ('t', 'f')
    30683070        data = dict(id=42, i2=[42, 1234, None, 0, -1],
    3069             i4=[42, 123456789, None, 0, 1, -1],
    3070             i8=[long(42), long(123456789123456789), None,
    3071                 long(0), long(1), long(-1)],
    3072             d=[decimal(42), long_decimal, None,
    3073                decimal(0), decimal(1), decimal(-1), -long_decimal],
    3074             f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
    3075                 float('inf'), float('-inf')],
    3076             f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
    3077                 float('inf'), float('-inf')],
    3078             m=[decimal('42.00'), odd_money, None,
    3079                decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
    3080             b=[t, f, t, None, f, t, None, None, t],
    3081             v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
    3082             t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
     3071                    i4=[42, 123456789, None, 0, 1, -1],
     3072                    i8=[long(42), long(123456789123456789), None,
     3073                        long(0), long(1), long(-1)],
     3074                    d=[decimal(42), long_decimal, None,
     3075                       decimal(0), decimal(1), decimal(-1), -long_decimal],
     3076                    f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
     3077                        float('inf'), float('-inf')],
     3078                    f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
     3079                        float('inf'), float('-inf')],
     3080                    m=[decimal('42.00'), odd_money, None,
     3081                       decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
     3082                    b=[t, f, t, None, f, t, None, None, t],
     3083                    v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
     3084                    t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
    30833085        r = data.copy()
    30843086        self.db.insert('arraytest', r)
     
    32533255        query = self.db.query
    32543256        query('create type test_person_type as'
    3255             ' (name varchar, age smallint, married bool,'
     3257              ' (name varchar, age smallint, married bool,'
    32563258              ' weight real, salary money)')
    32573259        self.addCleanup(query, 'drop type test_person_type')
    32583260        self.createTable('test_person', 'person test_person_type',
    3259             temporary=False, oids=True)
     3261                         temporary=False, oids=True)
    32603262        attnames = self.db.get_attnames('test_person')
    32613263        self.assertEqual(len(attnames), 2)
     
    32693271        if self.regtypes:
    32703272            self.assertEqual(person_typ.attnames,
    3271                 dict(name='character varying', age='smallint',
    3272                     married='boolean', weight='real', salary='money'))
     3273                             dict(name='character varying', age='smallint',
     3274                                  married='boolean', weight='real', salary='money'))
    32733275        else:
    32743276            self.assertEqual(person_typ.attnames,
    3275                 dict(name='text', age='int', married='bool',
    3276                     weight='float', salary='money'))
     3277                             dict(name='text', age='int', married='bool',
     3278                                  weight='float', salary='money'))
    32773279        decimal = pg.get_decimal()
    32783280        if pg.get_bool():
     
    33433345        query = self.db.query
    33443346        query('create type test_person_type as'
    3345             ' (name text, picture bytea)')
     3347              ' (name text, picture bytea)')
    33463348        self.addCleanup(query, 'drop type test_person_type')
    33473349        self.createTable('test_person', 'person test_person_type',
    3348             temporary=False, oids=True)
     3350                         temporary=False, oids=True)
    33493351        person_typ = self.db.get_attnames('test_person')['person']
    33503352        self.assertEqual(person_typ.attnames,
    3351             dict(name='text', picture='bytea'))
     3353                         dict(name='text', picture='bytea'))
    33523354        person = ('John Doe', b'O\x00ps\xff!')
    33533355        r = self.db.insert('test_person', None, person=person)
     
    33643366        try:
    33653367            query('create type test_person_type as'
    3366                 ' (name text, data json)')
     3368                  ' (name text, data json)')
    33673369        except pg.ProgrammingError as error:
    33683370            if self.db.server_version < 90200:
     
    33713373        self.addCleanup(query, 'drop type test_person_type')
    33723374        self.createTable('test_person', 'person test_person_type',
    3373             temporary=False, oids=True)
     3375                         temporary=False, oids=True)
    33743376        person_typ = self.db.get_attnames('test_person')['person']
    33753377        self.assertEqual(person_typ.attnames,
    3376             dict(name='text', data='json'))
     3378                         dict(name='text', data='json'))
    33773379        person = ('John Doe', dict(age=61, married=True, weight=99.5))
    33783380        r = self.db.insert('test_person', None, person=person)
     
    33903392        query = self.db.query
    33913393        query('create type test_person_type as'
    3392             ' (name varchar, age smallint)')
     3394              ' (name varchar, age smallint)')
    33933395        self.addCleanup(query, 'drop type test_person_type')
    33943396        self.createTable('test_person', 'person test_person_type',
    3395             temporary=False, oids=True)
     3397                         temporary=False, oids=True)
    33963398        person_typ = self.db.get_attnames('test_person')['person']
    33973399        if self.regtypes:
     
    34013403        if self.regtypes:
    34023404            self.assertEqual(person_typ.attnames,
    3403                 dict(name='character varying', age='smallint'))
     3405                             dict(name='character varying', age='smallint'))
    34043406        else:
    34053407            self.assertEqual(person_typ.attnames,
    3406                 dict(name='text', age='int'))
     3408                             dict(name='text', age='int'))
    34073409        person = pg._Literal("('John Doe', 61)")
    34083410        r = self.db.insert('test_person', None, person=person)
     
    34133415        self.assertEqual(p.age, 61)
    34143416        self.assertIsInstance(p.age, int)
     3417
     3418    def testDbTypesInfo(self):
     3419        dbtypes = self.db.dbtypes
     3420        self.assertIsInstance(dbtypes, dict)
     3421        self.assertNotIn('numeric', dbtypes)
     3422        typ = dbtypes['numeric']
     3423        self.assertIn('numeric', dbtypes)
     3424        self.assertEqual(typ, 'numeric' if self.regtypes else 'num')
     3425        self.assertEqual(typ.oid, 1700)
     3426        self.assertEqual(typ.pgtype, 'numeric')
     3427        self.assertEqual(typ.regtype, 'numeric')
     3428        self.assertEqual(typ.simple, 'num')
     3429        self.assertEqual(typ.typtype, 'b')
     3430        self.assertEqual(typ.category, 'N')
     3431        self.assertEqual(typ.delim, ',')
     3432        self.assertEqual(typ.relid, 0)
     3433        self.assertIs(dbtypes[1700], typ)
     3434        self.assertNotIn('pg_type', dbtypes)
     3435        typ = dbtypes['pg_type']
     3436        self.assertIn('pg_type', dbtypes)
     3437        self.assertEqual(typ, 'pg_type' if self.regtypes else 'record')
     3438        self.assertIsInstance(typ.oid, int)
     3439        self.assertEqual(typ.pgtype, 'pg_type')
     3440        self.assertEqual(typ.regtype, 'pg_type')
     3441        self.assertEqual(typ.simple, 'record')
     3442        self.assertEqual(typ.typtype, 'c')
     3443        self.assertEqual(typ.category, 'C')
     3444        self.assertEqual(typ.delim, ',')
     3445        self.assertNotEqual(typ.relid, 0)
     3446        attnames = typ.attnames
     3447        self.assertIsInstance(attnames, dict)
     3448        self.assertIs(attnames, dbtypes.get_attnames('pg_type'))
     3449        self.assertEqual(list(attnames)[0], 'typname')
     3450        typname = attnames['typname']
     3451        self.assertEqual(typname, 'name' if self.regtypes else 'text')
     3452        self.assertEqual(typname.typtype, 'b')  # base
     3453        self.assertEqual(typname.category, 'S')  # string
     3454        self.assertEqual(list(attnames)[3], 'typlen')
     3455        typlen = attnames['typlen']
     3456        self.assertEqual(typlen, 'smallint' if self.regtypes else 'int')
     3457        self.assertEqual(typlen.typtype, 'b')  # base
     3458        self.assertEqual(typlen.category, 'N')  # numeric
     3459
     3460    def testDbTypesTypecast(self):
     3461        dbtypes = self.db.dbtypes
     3462        self.assertIsInstance(dbtypes, dict)
     3463        self.assertNotIn('circle', dbtypes)
     3464        cast_circle = dbtypes.get_typecast('circle')
     3465        squared_circle = lambda v: 'Squared Circle: %s' % v
     3466        dbtypes.set_typecast('circle', squared_circle)
     3467        self.assertIs(dbtypes.get_typecast('circle'), squared_circle)
     3468        r = self.db.query("select '0,0,1'::circle").getresult()[0][0]
     3469        self.assertIn('circle', dbtypes)
     3470        self.assertEqual(r, 'Squared Circle: <(0,0),1>')
     3471        self.assertEqual(dbtypes.typecast('Impossible', 'circle'),
     3472            'Squared Circle: Impossible')
     3473        dbtypes.reset_typecast('circle')
     3474        self.assertIs(dbtypes.get_typecast('circle'), cast_circle)
     3475
     3476    def testGetSetTypeCast(self):
     3477        get_typecast = pg.get_typecast
     3478        set_typecast = pg.set_typecast
     3479        dbtypes = self.db.dbtypes
     3480        self.assertIsInstance(dbtypes, dict)
     3481        self.assertNotIn('int4', dbtypes)
     3482        self.assertNotIn('real', dbtypes)
     3483        self.assertNotIn('bool', dbtypes)
     3484        self.assertIs(get_typecast('int4'), int)
     3485        self.assertIs(get_typecast('float4'), float)
     3486        self.assertIs(get_typecast('bool'), pg.cast_bool)
     3487        cast_circle = get_typecast('circle')
     3488        squared_circle = lambda v: 'Squared Circle: %s' % v
     3489        self.assertNotIn('circle', dbtypes)
     3490        set_typecast('circle', squared_circle)
     3491        self.assertNotIn('circle', dbtypes)
     3492        self.assertIs(get_typecast('circle'), squared_circle)
     3493        r = self.db.query("select '0,0,1'::circle").getresult()[0][0]
     3494        self.assertIn('circle', dbtypes)
     3495        self.assertEqual(r, 'Squared Circle: <(0,0),1>')
     3496        set_typecast('circle', cast_circle)
     3497        self.assertIs(get_typecast('circle'), cast_circle)
    34153498
    34163499    def testNotificationHandler(self):
Note: See TracChangeset for help on using the changeset viewer.