Changeset 774 for trunk/tests


Ignore:
Timestamp:
Jan 21, 2016, 1:49:28 PM (4 years ago)
Author:
cito
Message:

Add support for JSON and JSONB to pg and pgdb

This adds all necessary functions to make PyGreSQL automatically
convert between JSON columns and Python objects representing them.

The documentation has also been updated, see there for the details.

Also, tuples automatically bind to ROW expressions in pgdb now.

Location:
trunk/tests
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_classic_dbwrapper.py

    r770 r774  
    1919import sys
    2020import tempfile
     21import json
    2122
    2223import pg  # the module under test
     
    179180            'begin',
    180181            'cancel', 'clear', 'close', 'commit',
    181             'db', 'dbname', 'debug', 'delete',
    182             'end', 'endcopy', 'error',
     182            'db', 'dbname', 'debug', 'decode_json', 'delete',
     183            'encode_json', 'end', 'endcopy', 'error',
    183184            'escape_bytea', 'escape_identifier',
    184185            'escape_literal', 'escape_string',
     
    284285        self.assertEqual(self.db.unescape_bytea(''), b'')
    285286
     287    def testMethodDecodeJson(self):
     288        self.assertEqual(self.db.decode_json('{}'), {})
     289
     290    def testMethodEncodeJson(self):
     291        self.assertEqual(self.db.encode_json({}), '{}')
     292
    286293    def testMethodQuery(self):
    287294        query = self.db.query
     
    532539            b'\\x746861742773206be47365')
    533540        self.assertEqual(f(r'\\x4f007073ff21'), b'\\x4f007073ff21')
     541
     542    def testDecodeJson(self):
     543        f = self.db.decode_json
     544        self.assertIsNone(f('null'))
     545        data = {
     546          "id": 1, "name": "Foo", "price": 1234.5,
     547          "new": True, "note": None,
     548          "tags": ["Bar", "Eek"],
     549          "stock": {"warehouse": 300, "retail": 20}}
     550        text = json.dumps(data)
     551        r = f(text)
     552        self.assertIsInstance(r, dict)
     553        self.assertEqual(r, data)
     554        self.assertIsInstance(r['id'], int)
     555        self.assertIsInstance(r['name'], unicode)
     556        self.assertIsInstance(r['price'], float)
     557        self.assertIsInstance(r['new'], bool)
     558        self.assertIsInstance(r['tags'], list)
     559        self.assertIsInstance(r['stock'], dict)
     560
     561    def testEncodeJson(self):
     562        f = self.db.encode_json
     563        self.assertEqual(f(None), 'null')
     564        data = {
     565          "id": 1, "name": "Foo", "price": 1234.5,
     566          "new": True, "note": None,
     567          "tags": ["Bar", "Eek"],
     568          "stock": {"warehouse": 300, "retail": 20}}
     569        text = json.dumps(data)
     570        r = f(data)
     571        self.assertIsInstance(r, str)
     572        self.assertEqual(r, text)
    534573
    535574    def testGetParameter(self):
     
    27722811
    27732812    def testUpsertBytea(self):
    2774         query = self.db.query
    27752813        self.createTable('bytea_test', 'n smallint primary key, data bytea')
    27762814        s = b"It's all \\ kinds \x00 of\r nasty \xff stuff!\n"
     
    27952833        self.assertIn('data', r)
    27962834        self.assertIsNone(r['data'], bytes)
     2835
     2836    def testInsertGetJson(self):
     2837        try:
     2838            self.createTable('json_test', 'n smallint primary key, data json')
     2839        except pg.ProgrammingError as error:
     2840            if self.db.server_version < 90200:
     2841                self.skipTest('database does not support json')
     2842            self.fail(str(error))
     2843        jsondecode = pg.get_jsondecode()
     2844        # insert null value
     2845        r = self.db.insert('json_test', n=0, data=None)
     2846        self.assertIsInstance(r, dict)
     2847        self.assertIn('n', r)
     2848        self.assertEqual(r['n'], 0)
     2849        self.assertIn('data', r)
     2850        self.assertIsNone(r['data'])
     2851        r = self.db.get('json_test', 0)
     2852        self.assertIsInstance(r, dict)
     2853        self.assertIn('n', r)
     2854        self.assertEqual(r['n'], 0)
     2855        self.assertIn('data', r)
     2856        self.assertIsNone(r['data'])
     2857        # insert JSON object
     2858        data = {
     2859          "id": 1, "name": "Foo", "price": 1234.5,
     2860          "new": True, "note": None,
     2861          "tags": ["Bar", "Eek"],
     2862          "stock": {"warehouse": 300, "retail": 20}}
     2863        r = self.db.insert('json_test', n=1, data=data)
     2864        self.assertIsInstance(r, dict)
     2865        self.assertIn('n', r)
     2866        self.assertEqual(r['n'], 1)
     2867        self.assertIn('data', r)
     2868        r = r['data']
     2869        if jsondecode is None:
     2870            self.assertIsInstance(r, str)
     2871            r = json.loads(r)
     2872        self.assertIsInstance(r, dict)
     2873        self.assertEqual(r, data)
     2874        self.assertIsInstance(r['id'], int)
     2875        self.assertIsInstance(r['name'], unicode)
     2876        self.assertIsInstance(r['price'], float)
     2877        self.assertIsInstance(r['new'], bool)
     2878        self.assertIsInstance(r['tags'], list)
     2879        self.assertIsInstance(r['stock'], dict)
     2880        r = self.db.get('json_test', 1)
     2881        self.assertIsInstance(r, dict)
     2882        self.assertIn('n', r)
     2883        self.assertEqual(r['n'], 1)
     2884        self.assertIn('data', r)
     2885        r = r['data']
     2886        if jsondecode is None:
     2887            self.assertIsInstance(r, str)
     2888            r = json.loads(r)
     2889        self.assertIsInstance(r, dict)
     2890        self.assertEqual(r, data)
     2891        self.assertIsInstance(r['id'], int)
     2892        self.assertIsInstance(r['name'], unicode)
     2893        self.assertIsInstance(r['price'], float)
     2894        self.assertIsInstance(r['new'], bool)
     2895        self.assertIsInstance(r['tags'], list)
     2896        self.assertIsInstance(r['stock'], dict)
     2897
     2898    def testInsertGetJsonb(self):
     2899        try:
     2900            self.createTable('jsonb_test',
     2901                'n smallint primary key, data jsonb')
     2902        except pg.ProgrammingError as error:
     2903            if self.db.server_version < 90400:
     2904                self.skipTest('database does not support jsonb')
     2905            self.fail(str(error))
     2906        jsondecode = pg.get_jsondecode()
     2907        # insert null value
     2908        r = self.db.insert('jsonb_test', n=0, data=None)
     2909        self.assertIsInstance(r, dict)
     2910        self.assertIn('n', r)
     2911        self.assertEqual(r['n'], 0)
     2912        self.assertIn('data', r)
     2913        self.assertIsNone(r['data'])
     2914        r = self.db.get('jsonb_test', 0)
     2915        self.assertIsInstance(r, dict)
     2916        self.assertIn('n', r)
     2917        self.assertEqual(r['n'], 0)
     2918        self.assertIn('data', r)
     2919        self.assertIsNone(r['data'])
     2920        # insert JSON object
     2921        data = {
     2922          "id": 1, "name": "Foo", "price": 1234.5,
     2923          "new": True, "note": None,
     2924          "tags": ["Bar", "Eek"],
     2925          "stock": {"warehouse": 300, "retail": 20}}
     2926        r = self.db.insert('jsonb_test', n=1, data=data)
     2927        self.assertIsInstance(r, dict)
     2928        self.assertIn('n', r)
     2929        self.assertEqual(r['n'], 1)
     2930        self.assertIn('data', r)
     2931        r = r['data']
     2932        if jsondecode is None:
     2933            self.assertIsInstance(r, str)
     2934            r = json.loads(r)
     2935        self.assertIsInstance(r, dict)
     2936        self.assertEqual(r, data)
     2937        self.assertIsInstance(r['id'], int)
     2938        self.assertIsInstance(r['name'], unicode)
     2939        self.assertIsInstance(r['price'], float)
     2940        self.assertIsInstance(r['new'], bool)
     2941        self.assertIsInstance(r['tags'], list)
     2942        self.assertIsInstance(r['stock'], dict)
     2943        r = self.db.get('jsonb_test', 1)
     2944        self.assertIsInstance(r, dict)
     2945        self.assertIn('n', r)
     2946        self.assertEqual(r['n'], 1)
     2947        self.assertIn('data', r)
     2948        r = r['data']
     2949        if jsondecode is None:
     2950            self.assertIsInstance(r, str)
     2951            r = json.loads(r)
     2952        self.assertIsInstance(r, dict)
     2953        self.assertEqual(r, data)
     2954        self.assertIsInstance(r['id'], int)
     2955        self.assertIsInstance(r['name'], unicode)
     2956        self.assertIsInstance(r['price'], float)
     2957        self.assertIsInstance(r['new'], bool)
     2958        self.assertIsInstance(r['tags'], list)
     2959        self.assertIsInstance(r['stock'], dict)
    27972960
    27982961    def testNotificationHandler(self):
     
    28833046        not_bool = not pg.get_bool()
    28843047        cls.set_option('bool', not_bool)
    2885         unnamed_result = lambda q: q.getresult()
    2886         cls.set_option('namedresult', unnamed_result)
     3048        cls.set_option('namedresult', None)
     3049        cls.set_option('jsondecode', None)
    28873050        super(TestDBClassNonStdOpts, cls).setUpClass()
    28883051
     
    28903053    def tearDownClass(cls):
    28913054        super(TestDBClassNonStdOpts, cls).tearDownClass()
     3055        cls.reset_option('jsondecode')
    28923056        cls.reset_option('namedresult')
    28933057        cls.reset_option('bool')
  • trunk/tests/test_classic_functions.py

    r770 r774  
    1616    import unittest
    1717
     18import json
    1819import re
    1920
     
    182183    def testSetDecimalPoint(self):
    183184        point = pg.get_decimal_point()
    184         pg.set_decimal_point('*')
    185         r = pg.get_decimal_point()
    186         pg.set_decimal_point(point)
    187         self.assertIsInstance(r, str)
    188         self.assertEqual(r, '*')
     185        try:
     186            pg.set_decimal_point('*')
     187            r = pg.get_decimal_point()
     188            self.assertIsInstance(r, str)
     189            self.assertEqual(r, '*')
     190        finally:
     191            pg.set_decimal_point(point)
    189192        r = pg.get_decimal_point()
    190193        self.assertIsInstance(r, str)
     
    197200    def testSetDecimal(self):
    198201        decimal_class = pg.Decimal
    199         pg.set_decimal(int)
    200         r = pg.get_decimal()
    201         pg.set_decimal(decimal_class)
    202         self.assertIs(r, int)
     202        try:
     203            pg.set_decimal(int)
     204            r = pg.get_decimal()
     205            self.assertIs(r, int)
     206        finally:
     207            pg.set_decimal(decimal_class)
    203208        r = pg.get_decimal()
    204209        self.assertIs(r, decimal_class)
     
    211216    def testSetBool(self):
    212217        use_bool = pg.get_bool()
    213         pg.set_bool(True)
    214         r = pg.get_bool()
    215         pg.set_bool(use_bool)
    216         self.assertIsInstance(r, bool)
    217         self.assertIs(r, True)
    218         pg.set_bool(False)
    219         r = pg.get_bool()
    220         pg.set_bool(use_bool)
    221         self.assertIsInstance(r, bool)
    222         self.assertIs(r, False)
     218        try:
     219            pg.set_bool(True)
     220            r = pg.get_bool()
     221            pg.set_bool(use_bool)
     222            self.assertIsInstance(r, bool)
     223            self.assertIs(r, True)
     224            pg.set_bool(False)
     225            r = pg.get_bool()
     226            self.assertIsInstance(r, bool)
     227            self.assertIs(r, False)
     228        finally:
     229            pg.set_bool(use_bool)
    223230        r = pg.get_bool()
    224231        self.assertIsInstance(r, bool)
     
    232239    def testSetNamedresult(self):
    233240        namedresult = pg.get_namedresult()
    234         f = lambda q: q.getresult()
    235         pg.set_namedresult(f)
    236         r = pg.get_namedresult()
    237         pg.set_namedresult(namedresult)
    238         self.assertIs(r, f)
     241        try:
     242            pg.set_namedresult(None)
     243            r = pg.get_namedresult()
     244            self.assertIsNone(r)
     245            f = lambda q: q.getresult()
     246            pg.set_namedresult(f)
     247            r = pg.get_namedresult()
     248            self.assertIs(r, f)
     249            self.assertRaises(TypeError, pg.set_namedresult, 'invalid')
     250        finally:
     251            pg.set_namedresult(namedresult)
    239252        r = pg.get_namedresult()
    240253        self.assertIs(r, namedresult)
     254
     255    def testGetJsondecode(self):
     256        r = pg.get_jsondecode()
     257        self.assertTrue(callable(r))
     258        self.assertIs(r, json.loads)
     259
     260    def testSetJsondecode(self):
     261        jsondecode = pg.get_jsondecode()
     262        try:
     263            pg.set_jsondecode(None)
     264            r = pg.get_jsondecode()
     265            self.assertIsNone(r)
     266            pg.set_jsondecode(str)
     267            r = pg.get_jsondecode()
     268            self.assertIs(r, str)
     269            self.assertRaises(TypeError, pg.set_jsondecode, 'invalid')
     270        finally:
     271            pg.set_jsondecode(jsondecode)
     272        r = pg.get_jsondecode()
     273        self.assertIs(r, jsondecode)
    241274
    242275
  • trunk/tests/test_dbapi20.py

    r772 r774  
    365365            cur.execute("select * from %s order by 1" % table)
    366366            rows = cur.fetchall()
     367            self.assertEqual(cur.description[1].type_code, pgdb.FLOAT)
    367368        finally:
    368369            con.close()
     
    400401            cur.execute("select * from %s order by 1" % table)
    401402            rows = cur.fetchall()
     403            self.assertEqual(cur.description[1].type_code, pgdb.DATETIME)
    402404        finally:
    403405            con.close()
     
    409411            self.assertEqual(inval, outval)
    410412
    411     def test_array(self):
     413    def test_list_binds_as_array(self):
    412414        values = ([20000, 25000, 25000, 30000],
    413415            [['breakfast', 'consulting'], ['meeting', 'lunch']])
     
    425427            con.close()
    426428        self.assertEqual(row, output)
     429
     430    def test_tuple_binds_as_row(self):
     431        values = (1, 2.5, 'this is a test')
     432        output = '(1,2.5,"this is a test")'
     433        con = self._connect()
     434        try:
     435            cur = con.cursor()
     436            cur.execute("select %s", [values])
     437            outval = cur.fetchone()[0]
     438        finally:
     439            con.close()
     440        self.assertEqual(outval, output)
    427441
    428442    def test_custom_type(self):
     
    459473            self.assertTrue(pgdb.decimal_type(int) is int)
    460474            cur.execute('select 42')
     475            self.assertEqual(cur.description[0].type_code, pgdb.INTEGER)
    461476            value = cur.fetchone()[0]
    462477            self.assertTrue(isinstance(value, int))
     
    464479            self.assertTrue(pgdb.decimal_type(float) is float)
    465480            cur.execute('select 4.25')
     481            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
    466482            value = cur.fetchone()[0]
    467483            self.assertTrue(isinstance(value, float))
     
    550566            cur.execute("select * from %s order by 1" % table)
    551567            rows = cur.fetchall()
     568            self.assertEqual(cur.description[1].type_code, pgdb.BOOL)
    552569        finally:
    553570            con.close()
     
    557574        self.assertEqual(rows, values)
    558575
     576    def test_json(self):
     577        inval = {"employees":
     578            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
     579        table = self.table_prefix + 'booze'
     580        con = self._connect()
     581        try:
     582            cur = con.cursor()
     583            try:
     584                cur.execute("create table %s (jsontest json)" % table)
     585            except pgdb.ProgrammingError:
     586                self.skipTest('database does not support json')
     587            params = (pgdb.Json(inval),)
     588            cur.execute("insert into %s values (%%s)" % table, params)
     589            cur.execute("select * from %s" % table)
     590            outval = cur.fetchone()[0]
     591            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
     592        finally:
     593            con.close()
     594        self.assertEqual(inval, outval)
     595
     596    def test_jsonb(self):
     597        inval = {"employees":
     598            [{"firstName": "John", "lastName": "Doe", "age": 61}]}
     599        table = self.table_prefix + 'booze'
     600        con = self._connect()
     601        try:
     602            cur = con.cursor()
     603            try:
     604                cur.execute("create table %s (jsonbtest jsonb)" % table)
     605            except pgdb.ProgrammingError:
     606                self.skipTest('database does not support jsonb')
     607            params = (pgdb.Json(inval),)
     608            cur.execute("insert into %s values (%%s)" % table, params)
     609            cur.execute("select * from %s" % table)
     610            outval = cur.fetchone()[0]
     611            self.assertEqual(cur.description[0].type_code, pgdb.JSON)
     612        finally:
     613            con.close()
     614        self.assertEqual(inval, outval)
     615
    559616    def test_execute_edge_cases(self):
    560617        con = self._connect()
     
    564621            cur.executemany(sql, [])
    565622            sql = 'select %d + 1'
    566             cur.execute(sql, [(1,)])  # deprecated use of execute()
    567             self.assertEqual(cur.fetchone()[0], 2)
     623            cur.execute(sql, [(1,), (2,)])  # deprecated use of execute()
     624            self.assertEqual(cur.fetchone()[0], 3)
    568625            sql = 'select 1/0'  # cannot be executed
    569626            self.assertRaises(pgdb.ProgrammingError, cur.execute, sql)
Note: See TracChangeset for help on using the changeset viewer.