Changeset 799 for trunk/tests


Ignore:
Timestamp:
Jan 31, 2016, 1:45:58 PM (3 years ago)
Author:
cito
Message:

Improve adaptation and add query_formatted() method

Also added more tests and documentation.

Location:
trunk/tests
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_dbwrapper.py

    r798 r799  
    2424
    2525from decimal import Decimal
     26from datetime import date
    2627from operator import itemgetter
    2728
     
    177178    def testAllDBAttributes(self):
    178179        attributes = [
    179             'abort',
     180            'abort', 'adapter',
    180181            'begin',
    181182            'cancel', 'clear', 'close', 'commit',
     
    198199            'parameter', 'pkey', 'port',
    199200            'protocol_version', 'putline',
    200             'query',
     201            'query', 'query_formatted',
    201202            'release', 'reopen', 'reset', 'rollback',
    202203            'savepoint', 'server_version',
     
    667668        self.assertEqual(g('default_with_oids'), 'on')
    668669        self.assertEqual(g('standard_conforming_strings'), 'on')
    669         self.assertRaises(ValueError, f, set([ 'default_with_oids',
    670                                                'standard_conforming_strings']), ['off', 'on'])
     670        self.assertRaises(ValueError, f, set(['default_with_oids',
     671            'standard_conforming_strings']), ['off', 'on'])
    671672        f(set(['default_with_oids', 'standard_conforming_strings']),
    672           ['off', 'off'])
     673            ['off', 'off'])
    673674        self.assertEqual(g('default_with_oids'), 'off')
    674675        self.assertEqual(g('standard_conforming_strings'), 'off')
     
    877878            self.assertEqual(error.sqlstate, '22012')
    878879
     880    def testQueryFormatted(self):
     881        f = self.db.query_formatted
     882        t = True if pg.get_bool() else 't'
     883        q = f("select %s::int, %s::real, %s::text, %s::bool",
     884              (3, 2.5, 'hello', True))
     885        r = q.getresult()[0]
     886        self.assertEqual(r, (3, 2.5, 'hello', t))
     887        q = f("select %s, %s, %s, %s", (3, 2.5, 'hello', True), inline=True)
     888        r = q.getresult()[0]
     889        self.assertEqual(r, (3, 2.5, 'hello', t))
     890
    879891    def testPkey(self):
    880892        query = self.db.query
     
    885897            self.createTable('%s1' % t, 'b smallint primary key')
    886898            self.createTable('%s2' % t,
    887                              'c smallint, d smallint primary key')
     899                'c smallint, d smallint primary key')
    888900            self.createTable('%s3' % t,
    889                              'e smallint, f smallint, g smallint, h smallint, i smallint,'
    890                              ' primary key (f, h)')
     901                'e smallint, f smallint, g smallint, h smallint, i smallint,'
     902                ' primary key (f, h)')
    891903            self.createTable('%s4' % t,
    892                              'e smallint, f smallint, g smallint, h smallint, i smallint,'
    893                              ' primary key (h, f)')
     904                'e smallint, f smallint, g smallint, h smallint, i smallint,'
     905                ' primary key (h, f)')
    894906            self.createTable('%s5' % t,
    895                              'more_than_one_letter varchar primary key')
     907                'more_than_one_letter varchar primary key')
    896908            self.createTable('%s6' % t,
    897                              '"with space" date primary key')
     909                '"with space" date primary key')
    898910            self.createTable('%s7' % t,
    899                              'a_very_long_column_name varchar, "with space" date, "42" int,'
    900                              ' primary key (a_very_long_column_name, "with space", "42")')
     911                'a_very_long_column_name varchar, "with space" date, "42" int,'
     912                ' primary key (a_very_long_column_name, "with space", "42")')
    901913            self.assertRaises(KeyError, pkey, '%s0' % t)
    902914            self.assertEqual(pkey('%s1' % t), 'b')
     
    10221034        table = 'test table for get_attnames()'
    10231035        self.createTable(table,
    1024                          '"Prime!" smallint, "much space" integer, "Questions?" text')
     1036            '"Prime!" smallint, "much space" integer, "Questions?" text')
    10251037        r = get_attnames(table)
    10261038        self.assertIsInstance(r, dict)
     
    10521064        else:
    10531065            self.assertEqual(r, {'a': 'int', 'b': 'int', 'c': 'int',
    1054                                  'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
    1055                                  'normal_name': 'int', 'Special Name': 'int',
    1056                                  'u': 'text', 't': 'text', 'v': 'text',
    1057                                  'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
     1066                 'e': 'num', 'f': 'float', 'f2': 'float', 'm': 'money',
     1067                 'normal_name': 'int', 'Special Name': 'int',
     1068                 'u': 'text', 't': 'text', 'v': 'text',
     1069                 'y': 'int', 'x': 'int', 'z': 'int', 'oid': 'int'})
    10581070
    10591071    def testGetAttnamesWithRegtypes(self):
     
    14001412        except pg.DatabaseError as error:
    14011413            self.assertEqual(str(error),
    1402                              'No such record in test_students\nwhere "firstname" = $1\n'
    1403                              'with $1="D\' Arcy"')
     1414                'No such record in test_students\nwhere "firstname" = $1\n'
     1415                'with $1="D\' Arcy"')
    14041416        try:
    14051417            get('test_students', "Robert'); TRUNCATE TABLE test_students;--")
    14061418        except pg.DatabaseError as error:
    14071419            self.assertEqual(str(error),
    1408                              'No such record in test_students\nwhere "firstname" = $1\n'
    1409                              'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
     1420                'No such record in test_students\nwhere "firstname" = $1\n'
     1421                'with $1="Robert\'); TRUNCATE TABLE test_students;--"')
    14101422        q = "select * from test_students order by 1 limit 4"
    14111423        r = query(q).getresult()
     
    14201432        table = 'insert_test_table'
    14211433        self.createTable(table,
    1422                          'i2 smallint, i4 integer, i8 bigint,'
    1423                          ' d numeric, f4 real, f8 double precision, m money,'
    1424                          ' v4 varchar(4), c4 char(4), t text,'
    1425                          ' b boolean, ts timestamp', oids=True)
     1434            'i2 smallint, i4 integer, i8 bigint,'
     1435            ' d numeric, f4 real, f8 double precision, m money,'
     1436            ' v4 varchar(4), c4 char(4), t text,'
     1437            ' b boolean, ts timestamp', oids=True)
    14261438        oid_table = 'oid(%s)' % table
    14271439        tests = [dict(i2=None, i4=None, i8=None),
    1428                  (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
    1429                  (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
    1430                  dict(i2=42, i4=123456, i8=9876543210),
    1431                  dict(i2=2 ** 15 - 1,
    1432                       i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
    1433                  dict(d=None), (dict(d=''), dict(d=None)),
    1434                  dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
    1435                  dict(f4=None, f8=None), dict(f4=0, f8=0),
    1436                  (dict(f4='', f8=''), dict(f4=None, f8=None)),
    1437                  (dict(d=1234.5, f4=1234.5, f8=1234.5),
    1438                   dict(d=Decimal('1234.5'))),
    1439                  dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
    1440                  dict(d=Decimal('123456789.9876543212345678987654321')),
    1441                  dict(m=None), (dict(m=''), dict(m=None)),
    1442                  dict(m=Decimal('-1234.56')),
    1443                  (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
    1444                  dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
    1445                  (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
    1446                  (dict(m=1234.5), dict(m=Decimal('1234.5'))),
    1447                  (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
    1448                  (dict(m=123456), dict(m=Decimal('123456'))),
    1449                  (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
    1450                  dict(b=None), (dict(b=''), dict(b=None)),
    1451                  dict(b='f'), dict(b='t'),
    1452                  (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
    1453                  (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
    1454                  (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
    1455                  (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
    1456                  (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
    1457                  (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
    1458                  dict(v4=None, c4=None, t=None),
    1459                  (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
    1460                  dict(v4='1234', c4='1234', t='1234' * 10),
    1461                  dict(v4='abcd', c4='abcd', t='abcdefg'),
    1462                  (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
    1463                  dict(ts=None), (dict(ts=''), dict(ts=None)),
    1464                  (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
    1465                  dict(ts='2012-12-21 00:00:00'),
    1466                  (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
    1467                  dict(ts='2012-12-21 12:21:12'),
    1468                  dict(ts='2013-01-05 12:13:14'),
    1469                  dict(ts='current_timestamp')]
     1440             (dict(i2='', i4='', i8=''), dict(i2=None, i4=None, i8=None)),
     1441             (dict(i2=0, i4=0, i8=0), dict(i2=0, i4=0, i8=0)),
     1442             dict(i2=42, i4=123456, i8=9876543210),
     1443             dict(i2=2 ** 15 - 1,
     1444                  i4=int(2 ** 31 - 1), i8=long(2 ** 63 - 1)),
     1445             dict(d=None), (dict(d=''), dict(d=None)),
     1446             dict(d=Decimal(0)), (dict(d=0), dict(d=Decimal(0))),
     1447             dict(f4=None, f8=None), dict(f4=0, f8=0),
     1448             (dict(f4='', f8=''), dict(f4=None, f8=None)),
     1449             (dict(d=1234.5, f4=1234.5, f8=1234.5),
     1450              dict(d=Decimal('1234.5'))),
     1451             dict(d=Decimal('123.456789'), f4=12.375, f8=123.4921875),
     1452             dict(d=Decimal('123456789.9876543212345678987654321')),
     1453             dict(m=None), (dict(m=''), dict(m=None)),
     1454             dict(m=Decimal('-1234.56')),
     1455             (dict(m=('-1234.56')), dict(m=Decimal('-1234.56'))),
     1456             dict(m=Decimal('1234.56')), dict(m=Decimal('123456')),
     1457             (dict(m='1234.56'), dict(m=Decimal('1234.56'))),
     1458             (dict(m=1234.5), dict(m=Decimal('1234.5'))),
     1459             (dict(m=-1234.5), dict(m=Decimal('-1234.5'))),
     1460             (dict(m=123456), dict(m=Decimal('123456'))),
     1461             (dict(m='1234567.89'), dict(m=Decimal('1234567.89'))),
     1462             dict(b=None), (dict(b=''), dict(b=None)),
     1463             dict(b='f'), dict(b='t'),
     1464             (dict(b=0), dict(b='f')), (dict(b=1), dict(b='t')),
     1465             (dict(b=False), dict(b='f')), (dict(b=True), dict(b='t')),
     1466             (dict(b='0'), dict(b='f')), (dict(b='1'), dict(b='t')),
     1467             (dict(b='n'), dict(b='f')), (dict(b='y'), dict(b='t')),
     1468             (dict(b='no'), dict(b='f')), (dict(b='yes'), dict(b='t')),
     1469             (dict(b='off'), dict(b='f')), (dict(b='on'), dict(b='t')),
     1470             dict(v4=None, c4=None, t=None),
     1471             (dict(v4='', c4='', t=''), dict(c4=' ' * 4)),
     1472             dict(v4='1234', c4='1234', t='1234' * 10),
     1473             dict(v4='abcd', c4='abcd', t='abcdefg'),
     1474             (dict(v4='abc', c4='abc', t='abc'), dict(c4='abc ')),
     1475             dict(ts=None), (dict(ts=''), dict(ts=None)),
     1476             (dict(ts=0), dict(ts=None)), (dict(ts=False), dict(ts=None)),
     1477             dict(ts='2012-12-21 00:00:00'),
     1478             (dict(ts='2012-12-21'), dict(ts='2012-12-21 00:00:00')),
     1479             dict(ts='2012-12-21 12:21:12'),
     1480             dict(ts='2013-01-05 12:13:14'),
     1481             dict(ts='current_timestamp')]
    14701482        for test in tests:
    14711483            if isinstance(test, dict):
     
    20572069        table = 'clear_test_table'
    20582070        self.createTable(table,
    2059                          'n integer, f float, b boolean, d date, t text', oids=True)
     2071            'n integer, f float, b boolean, d date, t text', oids=True)
    20602072        r = clear(table)
    20612073        result = dict(n=0, f=0, b=f, d='', t='')
     
    20732085        table = 'test table for clear()'
    20742086        self.createTable(table, '"Prime!" smallint primary key,'
    2075                                 ' "much space" integer, "Questions?" text')
     2087            ' "much space" integer, "Questions?" text')
    20762088        r = clear(table)
    20772089        self.assertIsInstance(r, dict)
     
    22162228        table = 'delete_test_table_2'
    22172229        self.createTable(table,
    2218                          'n integer, m integer, t text, primary key (n, m)',
    2219                          values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
    2220                                  for n in range(3) for m in range(2)])
     2230             'n integer, m integer, t text, primary key (n, m)',
     2231             values=[(n + 1, m + 1, chr(ord('a') + 2 * n + m))
     2232                     for n in range(3) for m in range(2)])
    22212233        self.assertRaises(KeyError, self.db.delete, table, dict(n=2, t='b'))
    22222234        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 1)
    22232235        r = [r[0] for r in query('select t from "%s" where n=2'
    2224                                  ' order by m' % table).getresult()]
     2236            ' order by m' % table).getresult()]
    22252237        self.assertEqual(r, ['c'])
    22262238        self.assertEqual(self.db.delete(table, dict(n=2, m=2)), 0)
    22272239        r = [r[0] for r in query('select t from "%s" where n=3'
    2228                                  ' order by m' % table).getresult()]
     2240             ' order by m' % table).getresult()]
    22292241        self.assertEqual(r, ['e', 'f'])
    22302242        self.assertEqual(self.db.delete(table, dict(n=3, m=1)), 1)
    22312243        r = [r[0] for r in query('select t from "%s" where n=3'
    2232                                  ' order by m' % table).getresult()]
     2244             ' order by m' % table).getresult()]
    22332245        self.assertEqual(r, ['f'])
    22342246
     
    22382250        table = 'test table for delete()'
    22392251        self.createTable(table, '"Prime!" smallint primary key,'
    2240                                 ' "much space" integer, "Questions?" text',
    2241                          values=[(19, 5005, 'Yes!')])
     2252            ' "much space" integer, "Questions?" text',
     2253            values=[(19, 5005, 'Yes!')])
    22422254        r = {'Prime!': 17}
    22432255        r = delete(table, r)
     
    22552267        query = self.db.query
    22562268        self.createTable('test_parent',
    2257                          'n smallint primary key', values=range(3))
     2269            'n smallint primary key', values=range(3))
    22582270        self.createTable('test_child',
    2259                          'n smallint primary key references test_parent', values=range(3))
     2271            'n smallint primary key references test_parent', values=range(3))
    22602272        q = ("select (select count(*) from test_parent),"
    22612273             " (select count(*) from test_child)")
     
    24412453        self.assertEqual(r, (0, 0, 0, 0))
    24422454        self.assertRaises(ValueError, truncate,
    2443                           ['test_parent*', 'test_child'], only=[True, False])
     2455            ['test_parent*', 'test_child'], only=[True, False])
    24442456        truncate(['test_parent*', 'test_child'], only=[False, True])
    24452457
     
    24752487                 (3, 'Bart'), (4, 'Lisa'), (5, 'Maggie')]
    24762488        self.createTable(table,
    2477                          'id smallint primary key, name varchar', values=names)
     2489            'id smallint primary key, name varchar', values=names)
    24782490        r = get_as_list(table)
    24792491        self.assertIsInstance(r, list)
     
    25042516        self.assertEqual(r, [('Maggie',), ('Marge',)])
    25052517        r = get_as_list(table, what='name',
    2506                         where=["name like 'Ma%'", "name like '%r%'"])
     2518            where=["name like 'Ma%'", "name like '%r%'"])
    25072519        self.assertIsInstance(r, list)
    25082520        self.assertEqual(r, [('Marge',)])
     
    25892601                  (3, '#b2ffff', 'Celeste'), (4, '#c19a6b', 'Desert')]
    25902602        self.createTable(table,
    2591                          'id smallint primary key, rgb char(7), name varchar',
    2592                          values=colors)
     2603            'id smallint primary key, rgb char(7), name varchar',
     2604            values=colors)
    25932605        # keyname must be string, list or tuple
    25942606        self.assertRaises(KeyError, get_as_dict, table, 3)
     
    26682680        self.assertIsInstance(r, OrderedDict)
    26692681        expected = OrderedDict((row[1], row[2])
    2670                                for row in sorted(colors, key=itemgetter(1)))
     2682            for row in sorted(colors, key=itemgetter(1)))
    26712683        self.assertEqual(r, expected)
    26722684        for key in r:
     
    26792691            self.assertEqual(r.keys(), expected.keys())
    26802692        r = get_as_dict(table, what='id, name',
    2681                         where="rgb like '#b%'", scalar=True)
     2693            where="rgb like '#b%'", scalar=True)
    26822694        self.assertIsInstance(r, OrderedDict)
    26832695        expected = OrderedDict((row[0], row[2]) for row in colors[1:3])
     
    26932705        expected = r
    26942706        r = get_as_dict(table, what=['name', 'id'],
    2695                         where=['id > 1', 'id < 4', "rgb like '#b%'",
    2696                                "name not like 'A%'", "name not like '%t'"], scalar=True)
     2707            where=['id > 1', 'id < 4', "rgb like '#b%'",
     2708                   "name not like 'A%'", "name not like '%t'"], scalar=True)
    26972709        self.assertEqual(r, expected)
    26982710        r = get_as_dict(table, what='name, id', limit=2, offset=1, scalar=True)
    26992711        self.assertEqual(r, expected)
    27002712        r = get_as_dict(table, keyname=('id',), what=('name', 'id'),
    2701                         where=('id > 1', 'id < 4'), order=('id',), scalar=True)
     2713            where=('id > 1', 'id < 4'), order=('id',), scalar=True)
    27022714        self.assertEqual(r, expected)
    27032715        r = get_as_dict(table, limit=1)
     
    30453057    def testArray(self):
    30463058        self.createTable('arraytest',
    3047                          'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
    3048                          ' d numeric[], f4 real[], f8 double precision[], m money[],'
    3049                          ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
     3059            'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
     3060            ' d numeric[], f4 real[], f8 double precision[], m money[],'
     3061            ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
    30503062        r = self.db.get_attnames('arraytest')
    30513063        if self.regtypes:
     
    30693081        t, f = (True, False) if pg.get_bool() else ('t', 'f')
    30703082        data = dict(id=42, i2=[42, 1234, None, 0, -1],
    3071                     i4=[42, 123456789, None, 0, 1, -1],
    3072                     i8=[long(42), long(123456789123456789), None,
    3073                         long(0), long(1), long(-1)],
    3074                     d=[decimal(42), long_decimal, None,
    3075                        decimal(0), decimal(1), decimal(-1), -long_decimal],
    3076                     f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
    3077                         float('inf'), float('-inf')],
    3078                     f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
    3079                         float('inf'), float('-inf')],
    3080                     m=[decimal('42.00'), odd_money, None,
    3081                        decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
    3082                     b=[t, f, t, None, f, t, None, None, t],
    3083                     v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
    3084                     t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
     3083            i4=[42, 123456789, None, 0, 1, -1],
     3084            i8=[long(42), long(123456789123456789), None,
     3085                long(0), long(1), long(-1)],
     3086            d=[decimal(42), long_decimal, None,
     3087               decimal(0), decimal(1), decimal(-1), -long_decimal],
     3088            f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
     3089                float('inf'), float('-inf')],
     3090            f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
     3091                float('inf'), float('-inf')],
     3092            m=[decimal('42.00'), odd_money, None,
     3093               decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
     3094            b=[t, f, t, None, f, t, None, None, t],
     3095            v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
     3096            t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
    30853097        r = data.copy()
    30863098        self.db.insert('arraytest', r)
     
    31033115        self.assertEqual(r['i'], [1, 2, 3])
    31043116        self.assertEqual(r['t'], ['a', 'b', 'c'])
    3105         L = pg._Literal
     3117        L = pg.Literal
    31063118        r = dict(i=L("ARRAY[1, 2, 3]"), t=L("ARRAY['a', 'b', 'c']"))
    31073119        self.db.insert('arraytest', r)
     
    32713283        if self.regtypes:
    32723284            self.assertEqual(person_typ.attnames,
    3273                              dict(name='character varying', age='smallint',
    3274                                   married='boolean', weight='real', salary='money'))
     3285                dict(name='character varying', age='smallint',
     3286                    married='boolean', weight='real', salary='money'))
    32753287        else:
    32763288            self.assertEqual(person_typ.attnames,
    3277                              dict(name='text', age='int', married='bool',
    3278                                   weight='float', salary='money'))
     3289                dict(name='text', age='int', married='bool',
     3290                    weight='float', salary='money'))
    32793291        decimal = pg.get_decimal()
    32803292        if pg.get_bool():
     
    34073419            self.assertEqual(person_typ.attnames,
    34083420                             dict(name='text', age='int'))
    3409         person = pg._Literal("('John Doe', 61)")
     3421        person = pg.Literal("('John Doe', 61)")
    34103422        r = self.db.insert('test_person', None, person=person)
    34113423        p = r['person']
     
    34613473        dbtypes = self.db.dbtypes
    34623474        self.assertIsInstance(dbtypes, dict)
     3475        self.assertNotIn('int4', dbtypes)
     3476        self.assertIs(dbtypes.get_typecast('int4'), int)
     3477        dbtypes.set_typecast('int4', float)
     3478        self.assertIs(dbtypes.get_typecast('int4'), float)
     3479        dbtypes.reset_typecast('int4')
     3480        self.assertIs(dbtypes.get_typecast('int4'), int)
     3481        dbtypes.set_typecast('int4', float)
     3482        self.assertIs(dbtypes.get_typecast('int4'), float)
     3483        dbtypes.reset_typecast()
     3484        self.assertIs(dbtypes.get_typecast('int4'), int)
    34633485        self.assertNotIn('circle', dbtypes)
    3464         cast_circle = dbtypes.get_typecast('circle')
     3486        self.assertIsNone(dbtypes.get_typecast('circle'))
    34653487        squared_circle = lambda v: 'Squared Circle: %s' % v
    34663488        dbtypes.set_typecast('circle', squared_circle)
     
    34723494            'Squared Circle: Impossible')
    34733495        dbtypes.reset_typecast('circle')
    3474         self.assertIs(dbtypes.get_typecast('circle'), cast_circle)
     3496        self.assertIsNone(dbtypes.get_typecast('circle'))
    34753497
    34763498    def testGetSetTypeCast(self):
     
    34863508        self.assertIs(get_typecast('bool'), pg.cast_bool)
    34873509        cast_circle = get_typecast('circle')
     3510        self.addCleanup(set_typecast, 'circle', cast_circle)
    34883511        squared_circle = lambda v: 'Squared Circle: %s' % v
    34893512        self.assertNotIn('circle', dbtypes)
     
    36073630
    36083631
     3632class TestDBClassAdapter(unittest.TestCase):
     3633    """Test the adapter object associatd with the DB class."""
     3634
     3635    def setUp(self):
     3636        self.db = DB()
     3637        self.adapter = self.db.adapter
     3638
     3639    def tearDown(self):
     3640        try:
     3641            self.db.close()
     3642        except pg.InternalError:
     3643            pass
     3644
     3645    def testGuessSimpleType(self):
     3646        f = self.adapter.guess_simple_type
     3647        self.assertEqual(f(pg.Bytea(b'test')), 'bytea')
     3648        self.assertEqual(f('string'), 'text')
     3649        self.assertEqual(f(b'string'), 'text')
     3650        self.assertEqual(f(True), 'bool')
     3651        self.assertEqual(f(3), 'int')
     3652        self.assertEqual(f(2.75), 'float')
     3653        self.assertEqual(f(Decimal('4.25')), 'num')
     3654        self.assertEqual(f(date(2016, 1, 30)), 'date')
     3655        self.assertEqual(f([1, 2, 3]), 'int[]')
     3656        self.assertEqual(f([[[123]]]), 'int[]')
     3657        self.assertEqual(f(['a', 'b', 'c']), 'text[]')
     3658        self.assertEqual(f([[['abc']]]), 'text[]')
     3659        self.assertEqual(f([False, True]), 'bool[]')
     3660        self.assertEqual(f([[[False]]]), 'bool[]')
     3661        r = f(('string', True, 3, 2.75, [1], [False]))
     3662        self.assertEqual(r, 'record')
     3663        self.assertEqual(list(r.attnames.values()),
     3664            ['text', 'bool', 'int', 'float', 'int[]', 'bool[]'])
     3665
     3666    def testAdaptQueryTypedList(self):
     3667        format_query = self.adapter.format_query
     3668        self.assertRaises(TypeError, format_query,
     3669            '%s,%s', (1, 2), ('int2',))
     3670        self.assertRaises(TypeError, format_query,
     3671            '%s,%s', (1,), ('int2', 'int2'))
     3672        values = (3, 7.5, 'hello', True)
     3673        types = ('int4', 'float4', 'text', 'bool')
     3674        sql, params = format_query("select %s,%s,%s,%s", values, types)
     3675        self.assertEqual(sql, 'select $1,$2,$3,$4')
     3676        self.assertEqual(params, [3, 7.5, 'hello', 't'])
     3677        types = ('bool', 'bool', 'bool', 'bool')
     3678        sql, params = format_query("select %s,%s,%s,%s", values, types)
     3679        self.assertEqual(sql, 'select $1,$2,$3,$4')
     3680        self.assertEqual(params, ['t', 't', 'f', 't'])
     3681        values = ('2016-01-30', 'current_date')
     3682        types = ('date', 'date')
     3683        sql, params = format_query("values(%s,%s)", values, types)
     3684        self.assertEqual(sql, 'values($1,current_date)')
     3685        self.assertEqual(params, ['2016-01-30'])
     3686        values = ([1, 2, 3], ['a', 'b', 'c'])
     3687        types = ('_int4', '_text')
     3688        sql, params = format_query("%s::int4[],%s::text[]", values, types)
     3689        self.assertEqual(sql, '$1::int4[],$2::text[]')
     3690        self.assertEqual(params, ['{1,2,3}', '{a,b,c}'])
     3691        types = ('_bool', '_bool')
     3692        sql, params = format_query("%s::bool[],%s::bool[]", values, types)
     3693        self.assertEqual(sql, '$1::bool[],$2::bool[]')
     3694        self.assertEqual(params, ['{t,t,t}', '{f,f,f}'])
     3695        values = [(3, 7.5, 'hello', True, [123], ['abc'])]
     3696        t = self.adapter.simple_type
     3697        typ = t('record')
     3698        typ._get_attnames = lambda _self: pg.AttrDict([
     3699            ('i', t('int')), ('f', t('float')),
     3700            ('t', t('text')), ('b', t('bool')),
     3701            ('i3', t('int[]')), ('t3', t('text[]'))])
     3702        types = [typ]
     3703        sql, params = format_query('select %s', values, types)
     3704        self.assertEqual(sql, 'select $1')
     3705        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3706
     3707    def testAdaptQueryTypedDict(self):
     3708        format_query = self.adapter.format_query
     3709        self.assertRaises(TypeError, format_query,
     3710            '%s,%s', dict(i1=1, i2=2), dict(i1='int2'))
     3711        values = dict(i=3, f=7.5, t='hello', b=True)
     3712        types = dict(i='int4', f='float4',
     3713            t='text', b='bool')
     3714        sql, params = format_query(
     3715            "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
     3716        self.assertEqual(sql, 'select $3,$2,$4,$1')
     3717        self.assertEqual(params, ['t', 7.5, 3, 'hello'])
     3718        types = dict(i='bool', f='bool',
     3719            t='bool', b='bool')
     3720        sql, params = format_query(
     3721            "select %(i)s,%(f)s,%(t)s,%(b)s", values, types)
     3722        self.assertEqual(sql, 'select $3,$2,$4,$1')
     3723        self.assertEqual(params, ['t', 't', 't', 'f'])
     3724        values = dict(d1='2016-01-30', d2='current_date')
     3725        types = dict(d1='date', d2='date')
     3726        sql, params = format_query("values(%(d1)s,%(d2)s)", values, types)
     3727        self.assertEqual(sql, 'values($1,current_date)')
     3728        self.assertEqual(params, ['2016-01-30'])
     3729        values = dict(i=[1, 2, 3], t=['a', 'b', 'c'])
     3730        types = dict(i='_int4', t='_text')
     3731        sql, params = format_query(
     3732            "%(i)s::int4[],%(t)s::text[]", values, types)
     3733        self.assertEqual(sql, '$1::int4[],$2::text[]')
     3734        self.assertEqual(params, ['{1,2,3}', '{a,b,c}'])
     3735        types = dict(i='_bool', t='_bool')
     3736        sql, params = format_query(
     3737            "%(i)s::bool[],%(t)s::bool[]", values, types)
     3738        self.assertEqual(sql, '$1::bool[],$2::bool[]')
     3739        self.assertEqual(params, ['{t,t,t}', '{f,f,f}'])
     3740        values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
     3741        t = self.adapter.simple_type
     3742        typ = t('record')
     3743        typ._get_attnames = lambda _self: pg.AttrDict([
     3744            ('i', t('int')), ('f', t('float')),
     3745            ('t', t('text')), ('b', t('bool')),
     3746            ('i3', t('int[]')), ('t3', t('text[]'))])
     3747        types = dict(record=typ)
     3748        sql, params = format_query('select %(record)s', values, types)
     3749        self.assertEqual(sql, 'select $1')
     3750        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3751
     3752    def testAdaptQueryUntypedList(self):
     3753        format_query = self.adapter.format_query
     3754        values = (3, 7.5, 'hello', True)
     3755        sql, params = format_query("select %s,%s,%s,%s", values)
     3756        self.assertEqual(sql, 'select $1,$2,$3,$4')
     3757        self.assertEqual(params, [3, 7.5, 'hello', 't'])
     3758        values = [date(2016, 1, 30), 'current_date']
     3759        sql, params = format_query("values(%s,%s)", values)
     3760        self.assertEqual(sql, 'values($1,$2)')
     3761        self.assertEqual(params, values)
     3762        values = ([1, 2, 3], ['a', 'b', 'c'], [True, False, True])
     3763        sql, params = format_query("%s,%s,%s", values)
     3764        self.assertEqual(sql, "$1,$2,$3")
     3765        self.assertEqual(params, ['{1,2,3}', '{a,b,c}', '{t,f,t}'])
     3766        values = ([[1, 2], [3, 4]], [['a', 'b'], ['c', 'd']],
     3767            [[True, False], [False, True]])
     3768        sql, params = format_query("%s,%s,%s", values)
     3769        self.assertEqual(sql, "$1,$2,$3")
     3770        self.assertEqual(params, [
     3771            '{{1,2},{3,4}}', '{{a,b},{c,d}}', '{{t,f},{f,t}}'])
     3772        values = [(3, 7.5, 'hello', True, [123], ['abc'])]
     3773        sql, params = format_query('select %s', values)
     3774        self.assertEqual(sql, 'select $1')
     3775        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3776
     3777    def testAdaptQueryUntypedDict(self):
     3778        format_query = self.adapter.format_query
     3779        values = dict(i=3, f=7.5, t='hello', b=True)
     3780        sql, params = format_query(
     3781            "select %(i)s,%(f)s,%(t)s,%(b)s", values)
     3782        self.assertEqual(sql, 'select $3,$2,$4,$1')
     3783        self.assertEqual(params, ['t', 7.5, 3, 'hello'])
     3784        values = dict(d1='2016-01-30', d2='current_date')
     3785        sql, params = format_query("values(%(d1)s,%(d2)s)", values)
     3786        self.assertEqual(sql, 'values($1,$2)')
     3787        self.assertEqual(params, [values['d1'], values['d2']])
     3788        values = dict(i=[1, 2, 3], t=['a', 'b', 'c'], b=[True, False, True])
     3789        sql, params = format_query("%(i)s,%(t)s,%(b)s", values)
     3790        self.assertEqual(sql, "$2,$3,$1")
     3791        self.assertEqual(params, ['{t,f,t}', '{1,2,3}', '{a,b,c}'])
     3792        values = dict(i=[[1, 2], [3, 4]], t=[['a', 'b'], ['c', 'd']],
     3793            b=[[True, False], [False, True]])
     3794        sql, params = format_query("%(i)s,%(t)s,%(b)s", values)
     3795        self.assertEqual(sql, "$2,$3,$1")
     3796        self.assertEqual(params, [
     3797            '{{t,f},{f,t}}', '{{1,2},{3,4}}', '{{a,b},{c,d}}'])
     3798        values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
     3799        sql, params = format_query('select %(record)s', values)
     3800        self.assertEqual(sql, 'select $1')
     3801        self.assertEqual(params, ['(3,7.5,hello,t,{123},{abc})'])
     3802
     3803    def testAdaptQueryInlineList(self):
     3804        format_query = self.adapter.format_query
     3805        values = (3, 7.5, 'hello', True)
     3806        sql, params = format_query("select %s,%s,%s,%s", values, inline=True)
     3807        self.assertEqual(sql, "select 3,7.5,'hello',true")
     3808        self.assertEqual(params, [])
     3809        values = [date(2016, 1, 30), 'current_date']
     3810        sql, params = format_query("values(%s,%s)", values, inline=True)
     3811        self.assertEqual(sql, "values('2016-01-30','current_date')")
     3812        self.assertEqual(params, [])
     3813        values = ([1, 2, 3], ['a', 'b', 'c'], [True, False, True])
     3814        sql, params = format_query("%s,%s,%s", values, inline=True)
     3815        self.assertEqual(sql,
     3816            "ARRAY[1,2,3],ARRAY['a','b','c'],ARRAY[true,false,true]")
     3817        self.assertEqual(params, [])
     3818        values = ([[1, 2], [3, 4]], [['a', 'b'], ['c', 'd']],
     3819            [[True, False], [False, True]])
     3820        sql, params = format_query("%s,%s,%s", values, inline=True)
     3821        self.assertEqual(sql, "ARRAY[[1,2],[3,4]],ARRAY[['a','b'],['c','d']],"
     3822            "ARRAY[[true,false],[false,true]]")
     3823        self.assertEqual(params, [])
     3824        values = [(3, 7.5, 'hello', True, [123], ['abc'])]
     3825        sql, params = format_query('select %s', values, inline=True)
     3826        self.assertEqual(sql,
     3827            "select (3,7.5,'hello',true,ARRAY[123],ARRAY['abc'])")
     3828        self.assertEqual(params, [])
     3829
     3830    def testAdaptQueryInlineDict(self):
     3831        format_query = self.adapter.format_query
     3832        values = dict(i=3, f=7.5, t='hello', b=True)
     3833        sql, params = format_query(
     3834            "select %(i)s,%(f)s,%(t)s,%(b)s", values, inline=True)
     3835        self.assertEqual(sql, "select 3,7.5,'hello',true")
     3836        self.assertEqual(params, [])
     3837        values = dict(d1='2016-01-30', d2='current_date')
     3838        sql, params = format_query(
     3839            "values(%(d1)s,%(d2)s)", values, inline=True)
     3840        self.assertEqual(sql, "values('2016-01-30','current_date')")
     3841        self.assertEqual(params, [])
     3842        values = dict(i=[1, 2, 3], t=['a', 'b', 'c'], b=[True, False, True])
     3843        sql, params = format_query("%(i)s,%(t)s,%(b)s", values, inline=True)
     3844        self.assertEqual(sql,
     3845            "ARRAY[1,2,3],ARRAY['a','b','c'],ARRAY[true,false,true]")
     3846        self.assertEqual(params, [])
     3847        values = dict(i=[[1, 2], [3, 4]], t=[['a', 'b'], ['c', 'd']],
     3848            b=[[True, False], [False, True]])
     3849        sql, params = format_query("%(i)s,%(t)s,%(b)s", values, inline=True)
     3850        self.assertEqual(sql, "ARRAY[[1,2],[3,4]],ARRAY[['a','b'],['c','d']],"
     3851            "ARRAY[[true,false],[false,true]]")
     3852        self.assertEqual(params, [])
     3853        values = dict(record=(3, 7.5, 'hello', True, [123], ['abc']))
     3854        sql, params = format_query('select %(record)s', values, inline=True)
     3855        self.assertEqual(sql,
     3856            "select (3,7.5,'hello',true,ARRAY[123],ARRAY['abc'])")
     3857        self.assertEqual(params, [])
     3858
     3859    def testAdaptQueryWithPgRepr(self):
     3860        format_query = self.adapter.format_query
     3861        self.assertRaises(TypeError, format_query,
     3862            '%s', object(), inline=True)
     3863        class TestObject:
     3864            def __pg_repr__(self):
     3865                return "'adapted'"
     3866        sql, params = format_query('select %s', [TestObject()], inline=True)
     3867        self.assertEqual(sql, "select 'adapted'")
     3868        self.assertEqual(params, [])
     3869        sql, params = format_query('select %s', [[TestObject()]], inline=True)
     3870        self.assertEqual(sql, "select ARRAY['adapted']")
     3871        self.assertEqual(params, [])
     3872
     3873
    36093874class TestSchemas(unittest.TestCase):
    36103875    """Test correct handling of schemas (namespaces)."""
  • trunk/tests/test_dbapi20.py

    r798 r799  
    828828        self.assertEqual(rows, values)
    829829
     830    def test_literal(self):
     831        con = self._connect()
     832        try:
     833            cur = con.cursor()
     834            value = "lower('Hello')"
     835            cur.execute("select %s, %s", (value, pgdb.Literal(value)))
     836            row = cur.fetchone()
     837        finally:
     838            con.close()
     839        self.assertEqual(row, (value, 'hello'))
     840
     841
    830842    def test_json(self):
    831843        inval = {"employees":
Note: See TracChangeset for help on using the changeset viewer.