Changeset 781 for trunk/tests


Ignore:
Timestamp:
Jan 25, 2016, 3:44:52 PM (4 years ago)
Author:
cito
Message:

Add full support for PostgreSQL array types

At the core of this patch is a fast parser for the peculiar syntax of
literal array expressions in PostgreSQL that was added to the C module.
This is not trivial, because PostgreSQL arrays can be multidimensional
and the syntax is different from Python and SQL expressions.

The Python pg and pgdb modules make use of this parser so that they can
return database columns containing PostgreSQL arrays to Python as lists.
Also added quoting methods that allow passing PostgreSQL arrays as lists
to insert()/update() and execute/executemany(). These methods are simpler
and were implemented in Python but needed support from the regex module.

The patch also adds makes getresult() in pg automatically return bytea
values in unescaped form as bytes strings. Before, it was necessary to
call unescape_bytea manually. The pgdb module did this already.

The patch includes some more refactorings and simplifications regarding
the quoting and casting in pg and pgdb.

Some references to antique PostgreSQL types that are not used any more
in the supported PostgreSQL versions have been removed.

Also added documentation and tests for the new features.

Location:
trunk/tests
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_connection.py

    r770 r781  
    865865        self.assertEqual(self.c.query("select $1::text AS garbage", (garbage,)
    866866            ).dictresult(), [{'garbage': garbage}])
     867
     868
     869class TestQueryResultTypes(unittest.TestCase):
     870    """Test proper result types via a basic pg connection."""
     871
     872    def setUp(self):
     873        self.c = connect()
     874        self.c.query('set client_encoding=utf8')
     875        self.c.query("set datestyle='ISO,YMD'")
     876
     877    def tearDown(self):
     878        self.c.close()
     879
     880    def assert_proper_cast(self, value, pgtype, pytype):
     881        q = 'select $1::%s' % (pgtype,)
     882        r = self.c.query(q, (value,)).getresult()[0][0]
     883        self.assertIsInstance(r, pytype)
     884        if isinstance(value, (bytes, str)):
     885            if not value or '{':
     886                value = '"%s"' % value
     887        value = '{%s}' % value
     888        r = self.c.query(q + '[]', (value,)).getresult()[0][0]
     889        self.assertIsInstance(r, list)
     890        self.assertEqual(len(r), 1)
     891        self.assertIsInstance(r[0], pytype)
     892
     893    def testInt(self):
     894        self.assert_proper_cast(0, 'int', int)
     895        self.assert_proper_cast(0, 'smallint', int)
     896        self.assert_proper_cast(0, 'oid', int)
     897        self.assert_proper_cast(0, 'cid', int)
     898        self.assert_proper_cast(0, 'xid', int)
     899
     900    def testLong(self):
     901        self.assert_proper_cast(0, 'bigint', long)
     902
     903    def testFloat(self):
     904        self.assert_proper_cast(0, 'float', float)
     905        self.assert_proper_cast(0, 'real', float)
     906        self.assert_proper_cast(0, 'double', float)
     907        self.assert_proper_cast(0, 'double precision', float)
     908        self.assert_proper_cast('infinity', 'float', float)
     909
     910    def testFloat(self):
     911        decimal = pg.get_decimal()
     912        self.assert_proper_cast(decimal(0), 'numeric', decimal)
     913        self.assert_proper_cast(decimal(0), 'decimal', decimal)
     914
     915    def testMoney(self):
     916        decimal = pg.get_decimal()
     917        self.assert_proper_cast(decimal('0'), 'money', decimal)
     918
     919    def testBool(self):
     920        bool_type = bool if pg.get_bool() else str
     921        self.assert_proper_cast('f', 'bool', bool_type)
     922
     923    def testDate(self):
     924        self.assert_proper_cast('1956-01-31', 'date', str)
     925        self.assert_proper_cast('0', 'interval', str)
     926        self.assert_proper_cast('08:42', 'time', str)
     927        self.assert_proper_cast('08:42', 'timetz', str)
     928        self.assert_proper_cast('1956-01-31 08:42', 'timestamp', str)
     929        self.assert_proper_cast('1956-01-31 08:42', 'timestamptz', str)
     930
     931    def testText(self):
     932        self.assert_proper_cast('', 'text', str)
     933        self.assert_proper_cast('', 'char', str)
     934        self.assert_proper_cast('', 'bpchar', str)
     935        self.assert_proper_cast('', 'varchar', str)
     936
     937    def testBytea(self):
     938        self.assert_proper_cast('', 'bytea', bytes)
     939
     940    def testJson(self):
     941        self.assert_proper_cast('{}', 'json', dict)
    867942
    868943
  • trunk/tests/test_classic_dbwrapper.py

    r774 r781  
    375375class TestDBClass(unittest.TestCase):
    376376    """Test the methods of the DB class wrapped pg connection."""
     377
     378    maxDiff = 80 * 20
    377379
    378380    cls_set_up = False
     
    14381440        query = self.db.query
    14391441        self.createTable('test_table', 'n int', oids=True)
     1442        self.assertRaises(pg.ProgrammingError, insert, 'test_table', m=1)
    14401443        r = insert('test_table', n=1)
    14411444        self.assertIsInstance(r, dict)
     
    14981501        self.assertIsInstance(r, dict)
    14991502        self.assertEqual(r['n'], 6)
    1500         r = query(q).getresult()
    15011503        r = ' '.join(str(row[0]) for row in query(q).getresult())
    15021504        self.assertEqual(r, '6 7')
     
    27382740        s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
    27392741        r = self.db.escape_bytea(s)
    2740         query('insert into bytea_test values(3,$1)', (r,))
     2742        query('insert into bytea_test values(3, $1)', (r,))
    27412743        r = query('select * from bytea_test where n=3').getresult()
    27422744        self.assertEqual(len(r), 1)
     
    27452747        self.assertEqual(r[0], 3)
    27462748        r = r[1]
    2747         self.assertIsInstance(r, str)
    2748         r = self.db.unescape_bytea(r)
    27492749        self.assertIsInstance(r, bytes)
    27502750        self.assertEqual(r, s)
     
    27972797        self.assertEqual(r[0], 5)
    27982798        r = r[1]
    2799         self.assertIsInstance(r, str)
    2800         r = self.db.unescape_bytea(r)
    28012799        self.assertIsInstance(r, bytes)
    28022800        self.assertEqual(r, s)
     
    28952893        self.assertIsInstance(r['tags'], list)
    28962894        self.assertIsInstance(r['stock'], dict)
     2895        # insert JSON object as text
     2896        self.db.insert('json_test', n=2, data=json.dumps(data))
     2897        q = "select data from json_test where n in (1, 2) order by n"
     2898        r = self.db.query(q).getresult()
     2899        self.assertEqual(len(r), 2)
     2900        self.assertIsInstance(r[0][0], str if jsondecode is None else dict)
     2901        self.assertEqual(r[0][0], r[1][0])
    28972902
    28982903    def testInsertGetJsonb(self):
     
    29582963        self.assertIsInstance(r['tags'], list)
    29592964        self.assertIsInstance(r['stock'], dict)
     2965
     2966    def testArray(self):
     2967        self.createTable('arraytest',
     2968            'id smallint, i2 smallint[], i4 integer[], i8 bigint[],'
     2969            ' d numeric[], f4 real[], f8 double precision[], m money[],'
     2970            ' b bool[], v4 varchar(4)[], c4 char(4)[], t text[]')
     2971        r = self.db.get_attnames('arraytest')
     2972        self.assertEqual(r, dict(id='int', i2='int[]', i4='int[]', i8='int[]',
     2973            d='num[]', f4='float[]', f8='float[]', m='money[]',
     2974            b='bool[]', v4='text[]', c4='text[]', t='text[]'))
     2975        decimal = pg.get_decimal()
     2976        if decimal is Decimal:
     2977            long_decimal = decimal('123456789.123456789')
     2978            odd_money = decimal('1234567891234567.89')
     2979        else:
     2980            long_decimal = decimal('12345671234.5')
     2981            odd_money = decimal('1234567123.25')
     2982        t, f = (True, False) if pg.get_bool() else ('t', 'f')
     2983        data = dict(id=42, i2=[42, 1234, None, 0, -1],
     2984            i4=[42, 123456789, None, 0, 1, -1],
     2985            i8=[long(42), long(123456789123456789), None,
     2986                long(0), long(1), long(-1)],
     2987            d=[decimal(42), long_decimal, None,
     2988               decimal(0), decimal(1), decimal(-1), -long_decimal],
     2989            f4=[42.0, 1234.5, None, 0.0, 1.0, -1.0,
     2990                float('inf'), float('-inf')],
     2991            f8=[42.0, 12345671234.5, None, 0.0, 1.0, -1.0,
     2992                float('inf'), float('-inf')],
     2993            m=[decimal('42.00'), odd_money, None,
     2994               decimal('0.00'), decimal('1.00'), decimal('-1.00'), -odd_money],
     2995            b=[t, f, t, None, f, t, None, None, t],
     2996            v4=['abc', '"Hi"', '', None], c4=['abc ', '"Hi"', '    ', None],
     2997            t=['abc', 'Hello, World!', '"Hello, World!"', '', None])
     2998        r = data.copy()
     2999        self.db.insert('arraytest', r)
     3000        self.assertEqual(r, data)
     3001        self.db.insert('arraytest', r)
     3002        r = self.db.get('arraytest', 42, 'id')
     3003        self.assertEqual(r, data)
     3004        r = self.db.query('select * from arraytest limit 1').dictresult()[0]
     3005        self.assertEqual(r, data)
     3006
     3007    def testArrayInput(self):
     3008        insert = self.db.insert
     3009        self.createTable('arraytest', 'i int[], t text[]', oids=True)
     3010        r = dict(i=[1, 2, 3], t=['a', 'b', 'c'])
     3011        insert('arraytest', r)
     3012        self.assertEqual(r['i'], [1, 2, 3])
     3013        self.assertEqual(r['t'], ['a', 'b', 'c'])
     3014        r = dict(i='{1,2,3}', t='{a,b,c}')
     3015        self.db.insert('arraytest', r)
     3016        self.assertEqual(r['i'], [1, 2, 3])
     3017        self.assertEqual(r['t'], ['a', 'b', 'c'])
     3018        r = dict(i="[1, 2, 3]", t="['a', 'b', 'c']")
     3019        self.db.insert('arraytest', r)
     3020        self.assertEqual(r['i'], [1, 2, 3])
     3021        self.assertEqual(r['t'], ['a', 'b', 'c'])
     3022        r = dict(i="array[1, 2, 3]", t="array['a', 'b', 'c']")
     3023        self.db.insert('arraytest', r)
     3024        self.assertEqual(r['i'], [1, 2, 3])
     3025        self.assertEqual(r['t'], ['a', 'b', 'c'])
     3026        r = dict(i="ARRAY[1, 2, 3]", t="ARRAY['a', 'b', 'c']")
     3027        self.db.insert('arraytest', r)
     3028        self.assertEqual(r['i'], [1, 2, 3])
     3029        self.assertEqual(r['t'], ['a', 'b', 'c'])
     3030        r = dict(i="1, 2, 3", t="'a', 'b', 'c'")
     3031        self.assertRaises(ValueError, self.db.insert, 'arraytest', r)
     3032
     3033    def testArrayOfIds(self):
     3034        self.createTable('arraytest', 'c cid[], o oid[], x xid[]', oids=True)
     3035        r = self.db.get_attnames('arraytest')
     3036        self.assertEqual(r, dict(oid='int', c='int[]', o='int[]', x='int[]'))
     3037        data = dict(c=[11, 12, 13], o=[21, 22, 23], x=[31, 32, 33])
     3038        r = data.copy()
     3039        self.db.insert('arraytest', r)
     3040        qoid = 'oid(arraytest)'
     3041        oid = r.pop(qoid)
     3042        self.assertEqual(r, data)
     3043        r = {qoid: oid}
     3044        self.db.get('arraytest', r)
     3045        self.assertEqual(oid, r.pop(qoid))
     3046        self.assertEqual(r, data)
     3047
     3048    def testArrayOfText(self):
     3049        self.createTable('arraytest', 'data text[]', oids=True)
     3050        r = self.db.get_attnames('arraytest')
     3051        self.assertEqual(r['data'], 'text[]')
     3052        data = ['Hello, World!', '', None, '{a,b,c}', '"Hi!"',
     3053                'null', 'NULL', 'Null', 'nulL',
     3054                "It's all \\ kinds of\r nasty stuff!\n"]
     3055        r = dict(data=data)
     3056        self.db.insert('arraytest', r)
     3057        self.assertEqual(r['data'], data)
     3058        self.assertIsInstance(r['data'][1], str)
     3059        self.assertIsNone(r['data'][2])
     3060        r['data'] = None
     3061        self.db.get('arraytest', r)
     3062        self.assertEqual(r['data'], data)
     3063        self.assertIsInstance(r['data'][1], str)
     3064        self.assertIsNone(r['data'][2])
     3065
     3066    def testArrayOfBytea(self):
     3067        self.createTable('arraytest', 'data bytea[]', oids=True)
     3068        r = self.db.get_attnames('arraytest')
     3069        self.assertEqual(r['data'], 'bytea[]')
     3070        data = [b'Hello, World!', b'', None, b'{a,b,c}', b'"Hi!"',
     3071                b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"]
     3072        r = dict(data=data)
     3073        self.db.insert('arraytest', r)
     3074        self.assertEqual(r['data'], data)
     3075        self.assertIsInstance(r['data'][1], bytes)
     3076        self.assertIsNone(r['data'][2])
     3077        r['data'] = None
     3078        self.db.get('arraytest', r)
     3079        self.assertEqual(r['data'], data)
     3080        self.assertIsInstance(r['data'][1], bytes)
     3081        self.assertIsNone(r['data'][2])
     3082
     3083    def testArrayOfJson(self):
     3084        try:
     3085            self.createTable('arraytest', 'data json[]', oids=True)
     3086        except pg.ProgrammingError as error:
     3087            if self.db.server_version < 90200:
     3088                self.skipTest('database does not support json')
     3089            self.fail(str(error))
     3090        r = self.db.get_attnames('arraytest')
     3091        self.assertEqual(r['data'], 'json[]')
     3092        data = [dict(id=815, name='John Doe'), dict(id=816, name='Jane Roe')]
     3093        jsondecode = pg.get_jsondecode()
     3094        r = dict(data=data)
     3095        self.db.insert('arraytest', r)
     3096        if jsondecode is None:
     3097            r['data'] = [json.loads(d) for d in r['data']]
     3098        self.assertEqual(r['data'], data)
     3099        r['data'] = None
     3100        self.db.get('arraytest', r)
     3101        if jsondecode is None:
     3102            r['data'] = [json.loads(d) for d in r['data']]
     3103        self.assertEqual(r['data'], data)
     3104        r = dict(data=[json.dumps(d) for d in data])
     3105        self.db.insert('arraytest', r)
     3106        if jsondecode is None:
     3107            r['data'] = [json.loads(d) for d in r['data']]
     3108        self.assertEqual(r['data'], data)
     3109        r['data'] = None
     3110        self.db.get('arraytest', r)
     3111        # insert empty json values
     3112        r = dict(data=['', None])
     3113        self.db.insert('arraytest', r)
     3114        r = r['data']
     3115        self.assertIsInstance(r, list)
     3116        self.assertEqual(len(r), 2)
     3117        self.assertIsNone(r[0])
     3118        self.assertIsNone(r[1])
     3119
     3120    def testArrayOfJsonb(self):
     3121        try:
     3122            self.createTable('arraytest', 'data jsonb[]', oids=True)
     3123        except pg.ProgrammingError as error:
     3124            if self.db.server_version < 90400:
     3125                self.skipTest('database does not support jsonb')
     3126            self.fail(str(error))
     3127        r = self.db.get_attnames('arraytest')
     3128        self.assertEqual(r['data'], 'json[]')
     3129        data = [dict(id=815, name='John Doe'), dict(id=816, name='Jane Roe')]
     3130        jsondecode = pg.get_jsondecode()
     3131        r = dict(data=data)
     3132        self.db.insert('arraytest', r)
     3133        if jsondecode is None:
     3134            r['data'] = [json.loads(d) for d in r['data']]
     3135        self.assertEqual(r['data'], data)
     3136        r['data'] = None
     3137        self.db.get('arraytest', r)
     3138        if jsondecode is None:
     3139            r['data'] = [json.loads(d) for d in r['data']]
     3140        self.assertEqual(r['data'], data)
     3141        r = dict(data=[json.dumps(d) for d in data])
     3142        self.db.insert('arraytest', r)
     3143        if jsondecode is None:
     3144            r['data'] = [json.loads(d) for d in r['data']]
     3145        self.assertEqual(r['data'], data)
     3146        r['data'] = None
     3147        self.db.get('arraytest', r)
     3148        # insert empty json values
     3149        r = dict(data=['', None])
     3150        self.db.insert('arraytest', r)
     3151        r = r['data']
     3152        self.assertIsInstance(r, list)
     3153        self.assertEqual(len(r), 2)
     3154        self.assertIsNone(r[0])
     3155        self.assertIsNone(r[1])
     3156
     3157    def testDeepArray(self):
     3158        self.createTable('arraytest', 'data text[][][]', oids=True)
     3159        r = self.db.get_attnames('arraytest')
     3160        self.assertEqual(r['data'], 'text[]')
     3161        data = [[['Hello, World!', '{a,b,c}', 'back\\slash']]]
     3162        r = dict(data=data)
     3163        self.db.insert('arraytest', r)
     3164        self.assertEqual(r['data'], data)
     3165        r['data'] = None
     3166        self.db.get('arraytest', r)
     3167        self.assertEqual(r['data'], data)
    29603168
    29613169    def testNotificationHandler(self):
     
    32503458        self.assertEqual(self.get_output(), "")
    32513459
     3460    def testDebugMultipleArgs(self):
     3461        output = []
     3462        self.db.debug = output.append
     3463        args = ['Error', 42, {1: 'a', 2: 'b'}, [3, 5, 7]]
     3464        self.db._do_debug(*args)
     3465        self.assertEqual(output, ['\n'.join(str(arg) for arg in args)])
     3466        self.assertEqual(self.get_output(), "")
     3467
    32523468
    32533469if __name__ == '__main__':
  • trunk/tests/test_classic_functions.py

    r774 r781  
    112112        pg.set_defbase(d0)
    113113        self.assertEqual(pg.get_defbase(), d0)
     114
     115
     116class TestParseArray(unittest.TestCase):
     117    """Test the array parser."""
     118
     119    array_expressions = [
     120        ('', str, ValueError),
     121        ('{}', None, []),
     122        ('{}', str, []),
     123        ('   {   }   ', None, []),
     124        ('{', str, ValueError),
     125        ('{{}', str, ValueError),
     126        ('{}{', str, ValueError),
     127        ('[]', str, ValueError),
     128        ('()', str, ValueError),
     129        ('{[]}', str, ['[]']),
     130        ('{hello}', int, ValueError),
     131        ('{42}', int, [42]),
     132        ('{ 42 }', int, [42]),
     133        ('{42', int, ValueError),
     134        ('{ 42 ', int, ValueError),
     135        ('{hello}', str, ['hello']),
     136        ('{ hello }', str, ['hello']),
     137        ('{hi}   ', str, ['hi']),
     138        ('{hi}   ?', str, ValueError),
     139        ('{null}', str, [None]),
     140        (' { NULL } ', str, [None]),
     141        ('   {   NULL   }   ', str, [None]),
     142        (' { not null } ', str, ['not null']),
     143        (' { not NULL } ', str, ['not NULL']),
     144        (' {"null"} ', str, ['null']),
     145        (' {"NULL"} ', str, ['NULL']),
     146        ('{Hi!}', str, ['Hi!']),
     147        ('{"Hi!"}', str, ['Hi!']),
     148        ('{" Hi! "}', str, [' Hi! ']),
     149        ('{a"}', str, ValueError),
     150        ('{"b}', str, ValueError),
     151        ('{a"b}', str, ValueError),
     152        (r'{a\"b}', str, ['a"b']),
     153        (r'{a\,b}', str, ['a,b']),
     154        (r'{a\bc}', str, ['abc']),
     155        (r'{"a\bc"}', str, ['abc']),
     156        (r'{\a\b\c}', str, ['abc']),
     157        (r'{"\a\b\c"}', str, ['abc']),
     158        ('{"{}"}', str, ['{}']),
     159        (r'{\{\}}', str, ['{}']),
     160        ('{"{a,b,c}"}', str, ['{a,b,c}']),
     161        ("{'abc'}", str, ["'abc'"]),
     162        ('{"abc"}', str, ['abc']),
     163        (r'{\"abc\"}', str, ['"abc"']),
     164        (r"{\'abc\'}", str, ["'abc'"]),
     165        (r"{abc,d,efg}", str, ['abc', 'd', 'efg']),
     166        ('{Hello World!}', str, ['Hello World!']),
     167        ('{Hello, World!}', str, ['Hello', 'World!']),
     168        ('{Hello,\ World!}', str, ['Hello', ' World!']),
     169        ('{Hello\, World!}', str, ['Hello, World!']),
     170        ('{"Hello World!"}', str, ['Hello World!']),
     171        ('{this, should, be, null}', str, ['this', 'should', 'be', None]),
     172        ('{This, should, be, NULL}', str, ['This', 'should', 'be', None]),
     173        ('{3, 2, 1, null}', int, [3, 2, 1, None]),
     174        ('{3, 2, 1, NULL}', int, [3, 2, 1, None]),
     175        ('{3,17,51}', int, [3, 17, 51]),
     176        (' { 3 , 17 , 51 } ', int, [3, 17, 51]),
     177        ('{3,17,51}', str, ['3', '17', '51']),
     178        (' { 3 , 17 , 51 } ', str, ['3', '17', '51']),
     179        ('{1,"2",abc,"def"}', str, ['1', '2', 'abc', 'def']),
     180        ('{{}}', int, [[]]),
     181        ('{{},{}}', int, [[], []]),
     182        ('{ {} , {} , {} }', int, [[], [], []]),
     183        ('{ {} , {} , {} , }', int, ValueError),
     184        ('{{{1,2,3},{4,5,6}}}', int, [[[1, 2, 3], [4, 5, 6]]]),
     185        ('{{1,2,3},{4,5,6},{7,8,9}}', int, [[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
     186        ('{20000, 25000, 25000, 25000}', int, [20000, 25000, 25000, 25000]),
     187        ('{{{17,18,19},{14,15,16},{11,12,13}},'
     188         '{{27,28,29},{24,25,26},{21,22,23}},'
     189         '{{37,38,39},{34,35,36},{31,32,33}}}', int,
     190            [[[17, 18, 19], [14, 15, 16], [11, 12, 13]],
     191             [[27, 28, 29], [24, 25, 26], [21, 22, 23]],
     192             [[37, 38, 39], [34, 35, 36], [31, 32, 33]]]),
     193        ('{{"breakfast", "consulting"}, {"meeting", "lunch"}}', str,
     194            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
     195        ('[1:3]={1,2,3}', int, [1, 2, 3]),
     196        ('[-1:1]={1,2,3}', int, [1, 2, 3]),
     197        ('[-1:+1]={1,2,3}', int, [1, 2, 3]),
     198        ('[-3:-1]={1,2,3}', int, [1, 2, 3]),
     199        ('[+1:+3]={1,2,3}', int, [1, 2, 3]),
     200        ('[]={1,2,3}', int, ValueError),
     201        ('[1:]={1,2,3}', int, ValueError),
     202        ('[:3]={1,2,3}', int, ValueError),
     203        ('[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}',
     204            int, [[[1, 2, 3], [4, 5, 6]]]),
     205        ('  [1:1]  [-2:-1]  [3:5]  =  { { { 1 , 2 , 3 }, {4 , 5 , 6 } } }',
     206            int, [[[1, 2, 3], [4, 5, 6]]]),
     207        ('[1:1][3:5]={{1,2,3},{4,5,6}}', int, [[1, 2, 3], [4, 5, 6]]),
     208        ('[3:5]={{1,2,3},{4,5,6}}', int, ValueError),
     209        ('[1:1][-2:-1][3:5]={{1,2,3},{4,5,6}}', int, ValueError)]
     210
     211    def testParserParams(self):
     212        f = pg.cast_array
     213        self.assertRaises(TypeError, f)
     214        self.assertRaises(TypeError, f, None)
     215        self.assertRaises(TypeError, f, '{}', 1)
     216        self.assertRaises(TypeError, f, '{}', ',',)
     217        self.assertRaises(TypeError, f, '{}', None, None)
     218        self.assertRaises(TypeError, f, '{}', None, 1)
     219        self.assertRaises(TypeError, f, '{}', None, '')
     220        self.assertRaises(TypeError, f, '{}', None, ',;')
     221        self.assertEqual(f('{}'), [])
     222        self.assertEqual(f('{}', None), [])
     223        self.assertEqual(f('{}', None, ';'), [])
     224        self.assertEqual(f('{}', str), [])
     225        self.assertEqual(f('{}', str, ';'), [])
     226
     227    def testParserSimple(self):
     228        r = pg.cast_array('{a,b,c}')
     229        self.assertIsInstance(r, list)
     230        self.assertEqual(len(r), 3)
     231        self.assertEqual(r, ['a', 'b', 'c'])
     232
     233    def testParserNested(self):
     234        f = pg.cast_array
     235        r = f('{{a,b,c}}')
     236        self.assertIsInstance(r, list)
     237        self.assertEqual(len(r), 1)
     238        r = r[0]
     239        self.assertIsInstance(r, list)
     240        self.assertEqual(len(r), 3)
     241        self.assertEqual(r, ['a', 'b', 'c'])
     242        self.assertRaises(ValueError, f, '{a,{b,c}}')
     243        r = f('{{a,b},{c,d}}')
     244        self.assertIsInstance(r, list)
     245        self.assertEqual(len(r), 2)
     246        r = r[1]
     247        self.assertIsInstance(r, list)
     248        self.assertEqual(len(r), 2)
     249        self.assertEqual(r, ['c', 'd'])
     250        r = f('{{a},{b},{c}}')
     251        self.assertIsInstance(r, list)
     252        self.assertEqual(len(r), 3)
     253        r = r[1]
     254        self.assertIsInstance(r, list)
     255        self.assertEqual(len(r), 1)
     256        self.assertEqual(r[0], 'b')
     257        r = f('{{{{{{{abc}}}}}}}')
     258        for i in range(7):
     259            self.assertIsInstance(r, list)
     260            self.assertEqual(len(r), 1)
     261            r = r[0]
     262        self.assertEqual(r, 'abc')
     263
     264    def testParserTooDeeplyNested(self):
     265        f = pg.cast_array
     266        for n in 3, 5, 9, 12, 16, 32, 64, 256:
     267            r = '%sa,b,c%s' % ('{' * n, '}' * n)
     268            if n > 16:  # hard coded maximum depth
     269                self.assertRaises(ValueError, f, r)
     270            else:
     271                r = f(r)
     272                for i in range(n - 1):
     273                    self.assertIsInstance(r, list)
     274                    self.assertEqual(len(r), 1)
     275                    r = r[0]
     276                self.assertEqual(len(r), 3)
     277                self.assertEqual(r, ['a', 'b', 'c'])
     278
     279    def testParserCast(self):
     280        f = pg.cast_array
     281        self.assertEqual(f('{1}'), ['1'])
     282        self.assertEqual(f('{1}', None), ['1'])
     283        self.assertEqual(f('{1}', int), [1])
     284        self.assertEqual(f('{1}', str), ['1'])
     285        self.assertEqual(f('{a}'), ['a'])
     286        self.assertEqual(f('{a}', None), ['a'])
     287        self.assertRaises(ValueError, f, '{a}', int)
     288        self.assertEqual(f('{a}', str), ['a'])
     289        cast = lambda s: '%s is ok' % s
     290        self.assertEqual(f('{a}', cast), ['a is ok'])
     291
     292    def testParserDelim(self):
     293        f = pg.cast_array
     294        self.assertEqual(f('{1,2}'), ['1', '2'])
     295        self.assertEqual(f('{1,2}', delim=','), ['1', '2'])
     296        self.assertEqual(f('{1;2}'), ['1;2'])
     297        self.assertEqual(f('{1;2}', delim=';'), ['1', '2'])
     298        self.assertEqual(f('{1,2}', delim=';'), ['1,2'])
     299
     300    def testParserWithData(self):
     301        f = pg.cast_array
     302        for expression, cast, expected in self.array_expressions:
     303            if expected is ValueError:
     304                self.assertRaises(ValueError, f, expression, cast)
     305            else:
     306                self.assertEqual(f(expression, cast), expected)
     307
     308    def testParserWithoutCast(self):
     309        f = pg.cast_array
     310
     311        for expression, cast, expected in self.array_expressions:
     312            if cast is not str:
     313                continue
     314            if expected is ValueError:
     315                self.assertRaises(ValueError, f, expression)
     316            else:
     317                self.assertEqual(f(expression), expected)
     318
     319    def testParserWithDifferentDelimiter(self):
     320        f = pg.cast_array
     321
     322        def replace_comma(value):
     323            if isinstance(value, basestring):
     324                return value.replace(',', ';')
     325            elif isinstance(value, list):
     326                return [replace_comma(v) for v in value]
     327            else:
     328                return value
     329
     330        for expression, cast, expected in self.array_expressions:
     331            expression = replace_comma(expression)
     332            if expected is ValueError:
     333                self.assertRaises(ValueError, f, expression, cast)
     334            else:
     335                expected = replace_comma(expected)
     336                self.assertEqual(f(expression, cast, ';'), expected)
    114337
    115338
  • trunk/tests/test_dbapi20.py

    r774 r781  
    362362                "create table %s (n smallint, floattest float)" % table)
    363363            params = enumerate(values)
    364             cur.executemany("insert into %s values (%%s,%%s)" % table, params)
    365             cur.execute("select * from %s order by 1" % table)
     364            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
     365            cur.execute("select floattest from %s order by n" % table)
    366366            rows = cur.fetchall()
    367             self.assertEqual(cur.description[1].type_code, pgdb.FLOAT)
     367            self.assertEqual(cur.description[0].type_code, pgdb.FLOAT)
     368            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
    368369        finally:
    369370            con.close()
    370371        self.assertEqual(len(rows), len(values))
    371         rows = [row[1] for row in rows]
     372        rows = [row[0] for row in rows]
    372373        for inval, outval in zip(values, rows):
    373374            if inval in ('inf', 'Infinity'):
     
    398399                "create table %s (n smallint, ts timestamp)" % table)
    399400            params = enumerate(values)
    400             cur.executemany("insert into %s values (%%s,%%s)" % table, params)
    401             cur.execute("select * from %s order by 1" % table)
     401            cur.executemany("insert into %s values (%%d,%%s)" % table, params)
     402            cur.execute("select ts from %s order by n" % table)
    402403            rows = cur.fetchall()
    403             self.assertEqual(cur.description[1].type_code, pgdb.DATETIME)
     404            self.assertEqual(cur.description[0].type_code, pgdb.DATETIME)
     405            self.assertNotEqual(cur.description[0].type_code, pgdb.ARRAY)
    404406        finally:
    405407            con.close()
    406408        self.assertEqual(len(rows), len(values))
    407         rows = [row[1] for row in rows]
     409        rows = [row[0] for row in rows]
    408410        for inval, outval in zip(values, rows):
    409411            if isinstance(inval, datetime):
     
    411413            self.assertEqual(inval, outval)
    412414
    413     def test_list_binds_as_array(self):
    414         values = ([20000, 25000, 25000, 30000],
    415             [['breakfast', 'consulting'], ['meeting', 'lunch']])
    416         output = ('{20000,25000,25000,30000}',
    417             '{{breakfast,consulting},{meeting,lunch}}')
    418         table = self.table_prefix + 'booze'
    419         con = self._connect()
    420         try:
    421             cur = con.cursor()
    422             cur.execute("create table %s (i int[], t text[][])" % table)
    423             cur.execute("insert into %s values (%%s,%%s)" % table, values)
    424             cur.execute("select * from %s" % table)
    425             row = cur.fetchone()
    426         finally:
    427             con.close()
    428         self.assertEqual(row, output)
     415    def test_roundtrip_with_list(self):
     416        values = [(None, None), ([], []), ([None], [[None], ['null']]),
     417            ([1, 2, 3], [['a', 'b'], ['c', 'd']]),
     418            ([20000, 25000, 25000, 30000],
     419            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
     420            ([0, 1, -1], [['Hello, World!', '"Hi!"'], ['{x,y}', ' x y ']])]
     421        table = self.table_prefix + 'booze'
     422        con = self._connect()
     423        try:
     424            cur = con.cursor()
     425            cur.execute("create table %s"
     426                " (n smallint, i int[], t text[][])" % table)
     427            params = [(n, v[0], v[1]) for n, v in enumerate(values)]
     428            cur.execute("insert into %s values (%%d,%%s,%%s)" % table, params)
     429            cur.execute("select i, t from %s order by n" % table)
     430            self.assertEqual(cur.description[0].type_code, pgdb.ARRAY)
     431            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
     432            self.assertEqual(cur.description[0].type_code, pgdb.INTEGER)
     433            self.assertEqual(cur.description[1].type_code, pgdb.ARRAY)
     434            self.assertEqual(cur.description[1].type_code, pgdb.STRING)
     435            rows = cur.fetchall()
     436        finally:
     437            con.close()
     438        self.assertEqual(rows, values)
    429439
    430440    def test_tuple_binds_as_row(self):
    431         values = (1, 2.5, 'this is a test')
     441        values = [(1, 2.5, 'this is a test')]
    432442        output = '(1,2.5,"this is a test")'
    433443        con = self._connect()
    434444        try:
    435445            cur = con.cursor()
    436             cur.execute("select %s", [values])
     446            cur.execute("select %s", values)
    437447            outval = cur.fetchone()[0]
    438448        finally:
     
    451461                'create table "%s" (n smallint, b bit varying(7))' % table)
    452462            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
    453             cur.execute("select * from %s order by 1" % table)
     463            cur.execute("select * from %s" % table)
    454464            rows = cur.fetchall()
    455465        finally:
     
    564574            params = enumerate(values)
    565575            cur.executemany("insert into %s values (%%s,%%s)" % table, params)
    566             cur.execute("select * from %s order by 1" % table)
     576            cur.execute("select booltest from %s order by n" % table)
    567577            rows = cur.fetchall()
    568             self.assertEqual(cur.description[1].type_code, pgdb.BOOL)
    569         finally:
    570             con.close()
    571         rows = [row[1] for row in rows]
     578            self.assertEqual(cur.description[0].type_code, pgdb.BOOL)
     579        finally:
     580            con.close()
     581        rows = [row[0] for row in rows]
    572582        values[3] = values[5] = True
    573583        values[4] = values[6] = False
     
    587597            params = (pgdb.Json(inval),)
    588598            cur.execute("insert into %s values (%%s)" % table, params)
    589             cur.execute("select * from %s" % table)
     599            cur.execute("select jsontest from %s" % table)
    590600            outval = cur.fetchone()[0]
    591601            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
     
    607617            params = (pgdb.Json(inval),)
    608618            cur.execute("insert into %s values (%%s)" % table, params)
    609             cur.execute("select * from %s" % table)
     619            cur.execute("select jsonbtest from %s" % table)
    610620            outval = cur.fetchone()[0]
    611621            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
     
    755765        self.assertTrue(pgdb.TIME <= pgdb.DATETIME)
    756766        self.assertTrue(pgdb.DATETIME >= pgdb.DATE)
     767        self.assertEqual(pgdb.ARRAY, pgdb.ARRAY)
     768        self.assertNotEqual(pgdb.ARRAY, pgdb.STRING)
     769        self.assertEqual('_char', pgdb.ARRAY)
     770        self.assertNotEqual('char', pgdb.ARRAY)
    757771
    758772
Note: See TracChangeset for help on using the changeset viewer.