source: trunk/tests/test_classic_functions.py @ 781

Last change on this file since 781 was 781, checked in by cito, 4 years ago

Add full support for PostgreSQL array types

At the core of this patch is a fast parser for the peculiar syntax of
literal array expressions in PostgreSQL that was added to the C module.
This is not trivial, because PostgreSQL arrays can be multidimensional
and the syntax is different from Python and SQL expressions.

The Python pg and pgdb modules make use of this parser so that they can
return database columns containing PostgreSQL arrays to Python as lists.
Also added quoting methods that allow passing PostgreSQL arrays as lists
to insert()/update() and execute/executemany(). These methods are simpler
and were implemented in Python but needed support from the regex module.

The patch also adds makes getresult() in pg automatically return bytea
values in unescaped form as bytes strings. Before, it was necessary to
call unescape_bytea manually. The pgdb module did this already.

The patch includes some more refactorings and simplifications regarding
the quoting and casting in pg and pgdb.

Some references to antique PostgreSQL types that are not used any more
in the supported PostgreSQL versions have been removed.

Also added documentation and tests for the new features.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 17.1 KB
Line 
1#! /usr/bin/python
2# -*- coding: utf-8 -*-
3
4"""Test the classic PyGreSQL interface.
5
6Sub-tests for the module functions and constants.
7
8Contributed by Christoph Zwerschke.
9
10These tests do not need a database to test against.
11"""
12
13try:
14    import unittest2 as unittest  # for Python < 2.7
15except ImportError:
16    import unittest
17
18import json
19import re
20
21import pg  # the module under test
22
23try:
24    long
25except NameError:  # Python >= 3.0
26    long = int
27
28try:
29    unicode
30except NameError:  # Python >= 3.0
31    unicode = str
32
33
34class TestHasConnect(unittest.TestCase):
35    """Test existence of basic pg module functions."""
36
37    def testhasPgError(self):
38        self.assertTrue(issubclass(pg.Error, Exception))
39
40    def testhasPgWarning(self):
41        self.assertTrue(issubclass(pg.Warning, Exception))
42
43    def testhasPgInterfaceError(self):
44        self.assertTrue(issubclass(pg.InterfaceError, pg.Error))
45
46    def testhasPgDatabaseError(self):
47        self.assertTrue(issubclass(pg.DatabaseError, pg.Error))
48
49    def testhasPgInternalError(self):
50        self.assertTrue(issubclass(pg.InternalError, pg.DatabaseError))
51
52    def testhasPgOperationalError(self):
53        self.assertTrue(issubclass(pg.OperationalError, pg.DatabaseError))
54
55    def testhasPgProgrammingError(self):
56        self.assertTrue(issubclass(pg.ProgrammingError, pg.DatabaseError))
57
58    def testhasPgIntegrityError(self):
59        self.assertTrue(issubclass(pg.IntegrityError, pg.DatabaseError))
60
61    def testhasPgDataError(self):
62        self.assertTrue(issubclass(pg.DataError, pg.DatabaseError))
63
64    def testhasPgNotSupportedError(self):
65        self.assertTrue(issubclass(pg.NotSupportedError, pg.DatabaseError))
66
67    def testhasConnect(self):
68        self.assertTrue(callable(pg.connect))
69
70    def testhasEscapeString(self):
71        self.assertTrue(callable(pg.escape_string))
72
73    def testhasEscapeBytea(self):
74        self.assertTrue(callable(pg.escape_bytea))
75
76    def testhasUnescapeBytea(self):
77        self.assertTrue(callable(pg.unescape_bytea))
78
79    def testDefHost(self):
80        d0 = pg.get_defhost()
81        d1 = 'pgtesthost'
82        pg.set_defhost(d1)
83        self.assertEqual(pg.get_defhost(), d1)
84        pg.set_defhost(d0)
85        self.assertEqual(pg.get_defhost(), d0)
86
87    def testDefPort(self):
88        d0 = pg.get_defport()
89        d1 = 1234
90        pg.set_defport(d1)
91        self.assertEqual(pg.get_defport(), d1)
92        if d0 is None:
93            d0 = -1
94        pg.set_defport(d0)
95        if d0 == -1:
96            d0 = None
97        self.assertEqual(pg.get_defport(), d0)
98
99    def testDefOpt(self):
100        d0 = pg.get_defopt()
101        d1 = '-h pgtesthost -p 1234'
102        pg.set_defopt(d1)
103        self.assertEqual(pg.get_defopt(), d1)
104        pg.set_defopt(d0)
105        self.assertEqual(pg.get_defopt(), d0)
106
107    def testDefBase(self):
108        d0 = pg.get_defbase()
109        d1 = 'pgtestdb'
110        pg.set_defbase(d1)
111        self.assertEqual(pg.get_defbase(), d1)
112        pg.set_defbase(d0)
113        self.assertEqual(pg.get_defbase(), d0)
114
115
116class TestParseArray(unittest.TestCase):
117    """Test the array parser."""
118
119    array_expressions = [
120        ('', str, ValueError),
121        ('{}', None, []),
122        ('{}', str, []),
123        ('   {   }   ', None, []),
124        ('{', str, ValueError),
125        ('{{}', str, ValueError),
126        ('{}{', str, ValueError),
127        ('[]', str, ValueError),
128        ('()', str, ValueError),
129        ('{[]}', str, ['[]']),
130        ('{hello}', int, ValueError),
131        ('{42}', int, [42]),
132        ('{ 42 }', int, [42]),
133        ('{42', int, ValueError),
134        ('{ 42 ', int, ValueError),
135        ('{hello}', str, ['hello']),
136        ('{ hello }', str, ['hello']),
137        ('{hi}   ', str, ['hi']),
138        ('{hi}   ?', str, ValueError),
139        ('{null}', str, [None]),
140        (' { NULL } ', str, [None]),
141        ('   {   NULL   }   ', str, [None]),
142        (' { not null } ', str, ['not null']),
143        (' { not NULL } ', str, ['not NULL']),
144        (' {"null"} ', str, ['null']),
145        (' {"NULL"} ', str, ['NULL']),
146        ('{Hi!}', str, ['Hi!']),
147        ('{"Hi!"}', str, ['Hi!']),
148        ('{" Hi! "}', str, [' Hi! ']),
149        ('{a"}', str, ValueError),
150        ('{"b}', str, ValueError),
151        ('{a"b}', str, ValueError),
152        (r'{a\"b}', str, ['a"b']),
153        (r'{a\,b}', str, ['a,b']),
154        (r'{a\bc}', str, ['abc']),
155        (r'{"a\bc"}', str, ['abc']),
156        (r'{\a\b\c}', str, ['abc']),
157        (r'{"\a\b\c"}', str, ['abc']),
158        ('{"{}"}', str, ['{}']),
159        (r'{\{\}}', str, ['{}']),
160        ('{"{a,b,c}"}', str, ['{a,b,c}']),
161        ("{'abc'}", str, ["'abc'"]),
162        ('{"abc"}', str, ['abc']),
163        (r'{\"abc\"}', str, ['"abc"']),
164        (r"{\'abc\'}", str, ["'abc'"]),
165        (r"{abc,d,efg}", str, ['abc', 'd', 'efg']),
166        ('{Hello World!}', str, ['Hello World!']),
167        ('{Hello, World!}', str, ['Hello', 'World!']),
168        ('{Hello,\ World!}', str, ['Hello', ' World!']),
169        ('{Hello\, World!}', str, ['Hello, World!']),
170        ('{"Hello World!"}', str, ['Hello World!']),
171        ('{this, should, be, null}', str, ['this', 'should', 'be', None]),
172        ('{This, should, be, NULL}', str, ['This', 'should', 'be', None]),
173        ('{3, 2, 1, null}', int, [3, 2, 1, None]),
174        ('{3, 2, 1, NULL}', int, [3, 2, 1, None]),
175        ('{3,17,51}', int, [3, 17, 51]),
176        (' { 3 , 17 , 51 } ', int, [3, 17, 51]),
177        ('{3,17,51}', str, ['3', '17', '51']),
178        (' { 3 , 17 , 51 } ', str, ['3', '17', '51']),
179        ('{1,"2",abc,"def"}', str, ['1', '2', 'abc', 'def']),
180        ('{{}}', int, [[]]),
181        ('{{},{}}', int, [[], []]),
182        ('{ {} , {} , {} }', int, [[], [], []]),
183        ('{ {} , {} , {} , }', int, ValueError),
184        ('{{{1,2,3},{4,5,6}}}', int, [[[1, 2, 3], [4, 5, 6]]]),
185        ('{{1,2,3},{4,5,6},{7,8,9}}', int, [[1, 2, 3], [4, 5, 6], [7, 8, 9]]),
186        ('{20000, 25000, 25000, 25000}', int, [20000, 25000, 25000, 25000]),
187        ('{{{17,18,19},{14,15,16},{11,12,13}},'
188         '{{27,28,29},{24,25,26},{21,22,23}},'
189         '{{37,38,39},{34,35,36},{31,32,33}}}', int,
190            [[[17, 18, 19], [14, 15, 16], [11, 12, 13]],
191             [[27, 28, 29], [24, 25, 26], [21, 22, 23]],
192             [[37, 38, 39], [34, 35, 36], [31, 32, 33]]]),
193        ('{{"breakfast", "consulting"}, {"meeting", "lunch"}}', str,
194            [['breakfast', 'consulting'], ['meeting', 'lunch']]),
195        ('[1:3]={1,2,3}', int, [1, 2, 3]),
196        ('[-1:1]={1,2,3}', int, [1, 2, 3]),
197        ('[-1:+1]={1,2,3}', int, [1, 2, 3]),
198        ('[-3:-1]={1,2,3}', int, [1, 2, 3]),
199        ('[+1:+3]={1,2,3}', int, [1, 2, 3]),
200        ('[]={1,2,3}', int, ValueError),
201        ('[1:]={1,2,3}', int, ValueError),
202        ('[:3]={1,2,3}', int, ValueError),
203        ('[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}',
204            int, [[[1, 2, 3], [4, 5, 6]]]),
205        ('  [1:1]  [-2:-1]  [3:5]  =  { { { 1 , 2 , 3 }, {4 , 5 , 6 } } }',
206            int, [[[1, 2, 3], [4, 5, 6]]]),
207        ('[1:1][3:5]={{1,2,3},{4,5,6}}', int, [[1, 2, 3], [4, 5, 6]]),
208        ('[3:5]={{1,2,3},{4,5,6}}', int, ValueError),
209        ('[1:1][-2:-1][3:5]={{1,2,3},{4,5,6}}', int, ValueError)]
210
211    def testParserParams(self):
212        f = pg.cast_array
213        self.assertRaises(TypeError, f)
214        self.assertRaises(TypeError, f, None)
215        self.assertRaises(TypeError, f, '{}', 1)
216        self.assertRaises(TypeError, f, '{}', ',',)
217        self.assertRaises(TypeError, f, '{}', None, None)
218        self.assertRaises(TypeError, f, '{}', None, 1)
219        self.assertRaises(TypeError, f, '{}', None, '')
220        self.assertRaises(TypeError, f, '{}', None, ',;')
221        self.assertEqual(f('{}'), [])
222        self.assertEqual(f('{}', None), [])
223        self.assertEqual(f('{}', None, ';'), [])
224        self.assertEqual(f('{}', str), [])
225        self.assertEqual(f('{}', str, ';'), [])
226
227    def testParserSimple(self):
228        r = pg.cast_array('{a,b,c}')
229        self.assertIsInstance(r, list)
230        self.assertEqual(len(r), 3)
231        self.assertEqual(r, ['a', 'b', 'c'])
232
233    def testParserNested(self):
234        f = pg.cast_array
235        r = f('{{a,b,c}}')
236        self.assertIsInstance(r, list)
237        self.assertEqual(len(r), 1)
238        r = r[0]
239        self.assertIsInstance(r, list)
240        self.assertEqual(len(r), 3)
241        self.assertEqual(r, ['a', 'b', 'c'])
242        self.assertRaises(ValueError, f, '{a,{b,c}}')
243        r = f('{{a,b},{c,d}}')
244        self.assertIsInstance(r, list)
245        self.assertEqual(len(r), 2)
246        r = r[1]
247        self.assertIsInstance(r, list)
248        self.assertEqual(len(r), 2)
249        self.assertEqual(r, ['c', 'd'])
250        r = f('{{a},{b},{c}}')
251        self.assertIsInstance(r, list)
252        self.assertEqual(len(r), 3)
253        r = r[1]
254        self.assertIsInstance(r, list)
255        self.assertEqual(len(r), 1)
256        self.assertEqual(r[0], 'b')
257        r = f('{{{{{{{abc}}}}}}}')
258        for i in range(7):
259            self.assertIsInstance(r, list)
260            self.assertEqual(len(r), 1)
261            r = r[0]
262        self.assertEqual(r, 'abc')
263
264    def testParserTooDeeplyNested(self):
265        f = pg.cast_array
266        for n in 3, 5, 9, 12, 16, 32, 64, 256:
267            r = '%sa,b,c%s' % ('{' * n, '}' * n)
268            if n > 16:  # hard coded maximum depth
269                self.assertRaises(ValueError, f, r)
270            else:
271                r = f(r)
272                for i in range(n - 1):
273                    self.assertIsInstance(r, list)
274                    self.assertEqual(len(r), 1)
275                    r = r[0]
276                self.assertEqual(len(r), 3)
277                self.assertEqual(r, ['a', 'b', 'c'])
278
279    def testParserCast(self):
280        f = pg.cast_array
281        self.assertEqual(f('{1}'), ['1'])
282        self.assertEqual(f('{1}', None), ['1'])
283        self.assertEqual(f('{1}', int), [1])
284        self.assertEqual(f('{1}', str), ['1'])
285        self.assertEqual(f('{a}'), ['a'])
286        self.assertEqual(f('{a}', None), ['a'])
287        self.assertRaises(ValueError, f, '{a}', int)
288        self.assertEqual(f('{a}', str), ['a'])
289        cast = lambda s: '%s is ok' % s
290        self.assertEqual(f('{a}', cast), ['a is ok'])
291
292    def testParserDelim(self):
293        f = pg.cast_array
294        self.assertEqual(f('{1,2}'), ['1', '2'])
295        self.assertEqual(f('{1,2}', delim=','), ['1', '2'])
296        self.assertEqual(f('{1;2}'), ['1;2'])
297        self.assertEqual(f('{1;2}', delim=';'), ['1', '2'])
298        self.assertEqual(f('{1,2}', delim=';'), ['1,2'])
299
300    def testParserWithData(self):
301        f = pg.cast_array
302        for expression, cast, expected in self.array_expressions:
303            if expected is ValueError:
304                self.assertRaises(ValueError, f, expression, cast)
305            else:
306                self.assertEqual(f(expression, cast), expected)
307
308    def testParserWithoutCast(self):
309        f = pg.cast_array
310
311        for expression, cast, expected in self.array_expressions:
312            if cast is not str:
313                continue
314            if expected is ValueError:
315                self.assertRaises(ValueError, f, expression)
316            else:
317                self.assertEqual(f(expression), expected)
318
319    def testParserWithDifferentDelimiter(self):
320        f = pg.cast_array
321
322        def replace_comma(value):
323            if isinstance(value, basestring):
324                return value.replace(',', ';')
325            elif isinstance(value, list):
326                return [replace_comma(v) for v in value]
327            else:
328                return value
329
330        for expression, cast, expected in self.array_expressions:
331            expression = replace_comma(expression)
332            if expected is ValueError:
333                self.assertRaises(ValueError, f, expression, cast)
334            else:
335                expected = replace_comma(expected)
336                self.assertEqual(f(expression, cast, ';'), expected)
337
338
339class TestEscapeFunctions(unittest.TestCase):
340    """Test pg escape and unescape functions.
341
342    The libpq interface memorizes some parameters of the last opened
343    connection that influence the result of these functions.
344    Therefore we cannot do rigid tests of these functions here.
345    We leave this for the test module that runs with a database.
346
347    """
348
349    def testEscapeString(self):
350        f = pg.escape_string
351        r = f(b'plain')
352        self.assertIsInstance(r, bytes)
353        self.assertEqual(r, b'plain')
354        r = f(u'plain')
355        self.assertIsInstance(r, unicode)
356        self.assertEqual(r, u'plain')
357        r = f("that's cheese")
358        self.assertIsInstance(r, str)
359        self.assertEqual(r, "that''s cheese")
360
361    def testEscapeBytea(self):
362        f = pg.escape_bytea
363        r = f(b'plain')
364        self.assertIsInstance(r, bytes)
365        self.assertEqual(r, b'plain')
366        r = f(u'plain')
367        self.assertIsInstance(r, unicode)
368        self.assertEqual(r, u'plain')
369        r = f("that's cheese")
370        self.assertIsInstance(r, str)
371        self.assertEqual(r, "that''s cheese")
372
373    def testUnescapeBytea(self):
374        f = pg.unescape_bytea
375        r = f(b'plain')
376        self.assertIsInstance(r, bytes)
377        self.assertEqual(r, b'plain')
378        r = f(u'plain')
379        self.assertIsInstance(r, bytes)
380        self.assertEqual(r, b'plain')
381        r = f(b"das is' k\\303\\244se")
382        self.assertIsInstance(r, bytes)
383        self.assertEqual(r, u"das is' kÀse".encode('utf-8'))
384        r = f(u"das is' k\\303\\244se")
385        self.assertIsInstance(r, bytes)
386        self.assertEqual(r, u"das is' kÀse".encode('utf-8'))
387        r = f(b'O\\000ps\\377!')
388        self.assertEqual(r, b'O\x00ps\xff!')
389        r = f(u'O\\000ps\\377!')
390        self.assertEqual(r, b'O\x00ps\xff!')
391
392
393class TestConfigFunctions(unittest.TestCase):
394    """Test the functions for changing default settings.
395
396    The effect of most of these cannot be tested here, because that
397    needs a database connection.  So we merely test their existence here.
398
399    """
400
401    def testGetDecimalPoint(self):
402        r = pg.get_decimal_point()
403        self.assertIsInstance(r, str)
404        self.assertEqual(r, '.')
405
406    def testSetDecimalPoint(self):
407        point = pg.get_decimal_point()
408        try:
409            pg.set_decimal_point('*')
410            r = pg.get_decimal_point()
411            self.assertIsInstance(r, str)
412            self.assertEqual(r, '*')
413        finally:
414            pg.set_decimal_point(point)
415        r = pg.get_decimal_point()
416        self.assertIsInstance(r, str)
417        self.assertEqual(r, point)
418
419    def testGetDecimal(self):
420        r = pg.get_decimal()
421        self.assertIs(r, pg.Decimal)
422
423    def testSetDecimal(self):
424        decimal_class = pg.Decimal
425        try:
426            pg.set_decimal(int)
427            r = pg.get_decimal()
428            self.assertIs(r, int)
429        finally:
430            pg.set_decimal(decimal_class)
431        r = pg.get_decimal()
432        self.assertIs(r, decimal_class)
433
434    def testGetBool(self):
435        r = pg.get_bool()
436        self.assertIsInstance(r, bool)
437        self.assertIs(r, False)
438
439    def testSetBool(self):
440        use_bool = pg.get_bool()
441        try:
442            pg.set_bool(True)
443            r = pg.get_bool()
444            pg.set_bool(use_bool)
445            self.assertIsInstance(r, bool)
446            self.assertIs(r, True)
447            pg.set_bool(False)
448            r = pg.get_bool()
449            self.assertIsInstance(r, bool)
450            self.assertIs(r, False)
451        finally:
452            pg.set_bool(use_bool)
453        r = pg.get_bool()
454        self.assertIsInstance(r, bool)
455        self.assertIs(r, use_bool)
456
457    def testGetNamedresult(self):
458        r = pg.get_namedresult()
459        self.assertTrue(callable(r))
460        self.assertIs(r, pg._namedresult)
461
462    def testSetNamedresult(self):
463        namedresult = pg.get_namedresult()
464        try:
465            pg.set_namedresult(None)
466            r = pg.get_namedresult()
467            self.assertIsNone(r)
468            f = lambda q: q.getresult()
469            pg.set_namedresult(f)
470            r = pg.get_namedresult()
471            self.assertIs(r, f)
472            self.assertRaises(TypeError, pg.set_namedresult, 'invalid')
473        finally:
474            pg.set_namedresult(namedresult)
475        r = pg.get_namedresult()
476        self.assertIs(r, namedresult)
477
478    def testGetJsondecode(self):
479        r = pg.get_jsondecode()
480        self.assertTrue(callable(r))
481        self.assertIs(r, json.loads)
482
483    def testSetJsondecode(self):
484        jsondecode = pg.get_jsondecode()
485        try:
486            pg.set_jsondecode(None)
487            r = pg.get_jsondecode()
488            self.assertIsNone(r)
489            pg.set_jsondecode(str)
490            r = pg.get_jsondecode()
491            self.assertIs(r, str)
492            self.assertRaises(TypeError, pg.set_jsondecode, 'invalid')
493        finally:
494            pg.set_jsondecode(jsondecode)
495        r = pg.get_jsondecode()
496        self.assertIs(r, jsondecode)
497
498
499class TestModuleConstants(unittest.TestCase):
500    """Test the existence of the documented module constants."""
501
502    def testVersion(self):
503        v = pg.version
504        self.assertIsInstance(v, str)
505        # make sure the version conforms to PEP440
506        re_version = r"""^
507            (\d[\.\d]*(?<= \d))
508            ((?:[abc]|rc)\d+)?
509            (?:(\.post\d+))?
510            (?:(\.dev\d+))?
511            (?:(\+(?![.])[a-zA-Z0-9\.]*[a-zA-Z0-9]))?
512            $"""
513        match = re.match(re_version, v, re.X)
514        self.assertIsNotNone(match)
515
516
517if __name__ == '__main__':
518    unittest.main()
Note: See TracBrowser for help on using the repository browser.