Changeset 797 for trunk/tests


Ignore:
Timestamp:
Jan 29, 2016, 5:43:22 PM (4 years ago)
Author:
cito
Message:

Cache typecast functions and make them configurable

The typecast functions used by the pgdb module are now cached
using a local and a global Typecasts class. The local cache is
bound to the connection and knows how to cast composite types.

Also added functions that allow registering custom typecast
functions on the global and local level.

Also added a chapter on type adaptation and casting to the docs.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/tests/test_dbapi20.py

    r796 r797  
    291291            self.assertIsNone(d.null_ok)
    292292
    293     def test_type_cache(self):
    294         con = self._connect()
    295         cur = con.cursor()
    296         type_cache = con.type_cache
    297         self.assertNotIn('numeric', type_cache)
    298         type_info = type_cache['numeric']
    299         self.assertIn('numeric', type_cache)
    300         self.assertEqual(type_info, 'numeric')
    301         self.assertEqual(type_info.oid, 1700)
    302         self.assertEqual(type_info.type, 'b')  # base
    303         self.assertEqual(type_info.category, 'N')  # numeric
    304         self.assertEqual(type_info.delim, ',')
    305         self.assertIs(con.type_cache[1700], type_info)
    306         self.assertNotIn('pg_type', type_cache)
    307         type_info = type_cache['pg_type']
    308         self.assertIn('numeric', type_cache)
    309         self.assertEqual(type_info.type, 'c')  # composite
    310         self.assertEqual(type_info.category, 'C')  # composite
    311         cols = type_cache.columns('pg_type')
    312         self.assertEqual(cols[0].name, 'typname')
    313         typname = type_cache[cols[0].type]
    314         self.assertEqual(typname, 'name')
    315         self.assertEqual(typname.type, 'b')  # base
    316         self.assertEqual(typname.category, 'S')  # string
    317         self.assertEqual(cols[3].name, 'typlen')
    318         typlen = type_cache[cols[3].type]
    319         self.assertEqual(typlen, 'int2')
    320         self.assertEqual(typlen.type, 'b')  # base
    321         self.assertEqual(typlen.category, 'N')  # numeric
    322         cur.close()
    323         cur = con.cursor()
    324         type_cache = con.type_cache
    325         self.assertIn('numeric', type_cache)
    326         cur.close()
    327         con.close()
    328         con = self._connect()
    329         cur = con.cursor()
    330         type_cache = con.type_cache
    331         self.assertNotIn('pg_type', type_cache)
    332         self.assertEqual(type_cache.get('pg_type'), type_info)
    333         self.assertIn('pg_type', type_cache)
    334         self.assertIsNone(type_cache.get(
    335             self.table_prefix + '_surely_does_not_exist'))
    336         cur.close()
    337         con.close()
     293    def test_type_cache_info(self):
     294        con = self._connect()
     295        try:
     296            cur = con.cursor()
     297            type_cache = con.type_cache
     298            self.assertNotIn('numeric', type_cache)
     299            type_info = type_cache['numeric']
     300            self.assertIn('numeric', type_cache)
     301            self.assertEqual(type_info, 'numeric')
     302            self.assertEqual(type_info.oid, 1700)
     303            self.assertEqual(type_info.len, -1)
     304            self.assertEqual(type_info.type, 'b')  # base
     305            self.assertEqual(type_info.category, 'N')  # numeric
     306            self.assertEqual(type_info.delim, ',')
     307            self.assertEqual(type_info.relid, 0)
     308            self.assertIs(con.type_cache[1700], type_info)
     309            self.assertNotIn('pg_type', type_cache)
     310            type_info = type_cache['pg_type']
     311            self.assertIn('numeric', type_cache)
     312            self.assertEqual(type_info.type, 'c')  # composite
     313            self.assertEqual(type_info.category, 'C')  # composite
     314            cols = type_cache.get_fields('pg_type')
     315            self.assertEqual(cols[0].name, 'typname')
     316            typname = type_cache[cols[0].type]
     317            self.assertEqual(typname, 'name')
     318            self.assertEqual(typname.type, 'b')  # base
     319            self.assertEqual(typname.category, 'S')  # string
     320            self.assertEqual(cols[3].name, 'typlen')
     321            typlen = type_cache[cols[3].type]
     322            self.assertEqual(typlen, 'int2')
     323            self.assertEqual(typlen.type, 'b')  # base
     324            self.assertEqual(typlen.category, 'N')  # numeric
     325            cur.close()
     326            cur = con.cursor()
     327            type_cache = con.type_cache
     328            self.assertIn('numeric', type_cache)
     329            cur.close()
     330        finally:
     331            con.close()
     332        con = self._connect()
     333        try:
     334            cur = con.cursor()
     335            type_cache = con.type_cache
     336            self.assertNotIn('pg_type', type_cache)
     337            self.assertEqual(type_cache.get('pg_type'), type_info)
     338            self.assertIn('pg_type', type_cache)
     339            self.assertIsNone(type_cache.get(
     340                self.table_prefix + '_surely_does_not_exist'))
     341            cur.close()
     342        finally:
     343            con.close()
     344
     345    def test_type_cache_typecast(self):
     346        con = self._connect()
     347        try:
     348            cur = con.cursor()
     349            type_cache = con.type_cache
     350            self.assertIs(type_cache.get_typecast('int4'), int)
     351            cast_int = lambda v: 'int(%s)' % v
     352            type_cache.set_typecast('int4', cast_int)
     353            query = 'select 2::int2, 4::int4, 8::int8'
     354            cur.execute(query)
     355            i2, i4, i8 = cur.fetchone()
     356            self.assertEqual(i2, 2)
     357            self.assertEqual(i4, 'int(4)')
     358            self.assertEqual(i8, 8)
     359            self.assertEqual(type_cache.typecast('int4', 42), 'int(42)')
     360            type_cache.set_typecast(['int2', 'int8'], cast_int)
     361            cur.execute(query)
     362            i2, i4, i8 = cur.fetchone()
     363            self.assertEqual(i2, 'int(2)')
     364            self.assertEqual(i4, 'int(4)')
     365            self.assertEqual(i8, 'int(8)')
     366            type_cache.reset_typecast('int4')
     367            cur.execute(query)
     368            i2, i4, i8 = cur.fetchone()
     369            self.assertEqual(i2, 'int(2)')
     370            self.assertEqual(i4, 4)
     371            self.assertEqual(i8, 'int(8)')
     372            type_cache.reset_typecast(['int2', 'int8'])
     373            cur.execute(query)
     374            i2, i4, i8 = cur.fetchone()
     375            self.assertEqual(i2, 2)
     376            self.assertEqual(i4, 4)
     377            self.assertEqual(i8, 8)
     378            type_cache.set_typecast(['int2', 'int8'], cast_int)
     379            cur.execute(query)
     380            i2, i4, i8 = cur.fetchone()
     381            self.assertEqual(i2, 'int(2)')
     382            self.assertEqual(i4, 4)
     383            self.assertEqual(i8, 'int(8)')
     384            type_cache.reset_typecast()
     385            cur.execute(query)
     386            i2, i4, i8 = cur.fetchone()
     387            self.assertEqual(i2, 2)
     388            self.assertEqual(i4, 4)
     389            self.assertEqual(i8, 8)
     390            cur.close()
     391        finally:
     392            con.close()
    338393
    339394    def test_cursor_iteration(self):
     
    533588            self.assertEqual(type_code, pgdb.RECORD)
    534589            self.assertNotEqual(type_code, pgdb.ARRAY)
    535             columns = con.type_cache.columns(type_code)
     590            columns = con.type_cache.get_fields(type_code)
    536591            self.assertEqual(columns[0].name, 'name')
    537592            self.assertEqual(columns[1].name, 'age')
     
    598653        try:
    599654            cur = con.cursor()
    600             self.assertTrue(pgdb.decimal_type(int) is int)
    601             cur.execute('select 42')
    602             self.assertEqual(cur.description[0].type_code, pgdb.INTEGER)
     655            # change decimal type globally to int
     656            int_type = lambda v: int(float(v))
     657            self.assertTrue(pgdb.decimal_type(int_type) is int_type)
     658            cur.execute('select 4.25')
     659            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
    603660            value = cur.fetchone()[0]
    604661            self.assertTrue(isinstance(value, int))
    605             self.assertEqual(value, 42)
     662            self.assertEqual(value, 4)
     663            # change decimal type again to float
    606664            self.assertTrue(pgdb.decimal_type(float) is float)
    607665            cur.execute('select 4.25')
    608666            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
    609667            value = cur.fetchone()[0]
     668            # the connection still uses the old setting
     669            self.assertTrue(isinstance(value, int))
     670            # bust the cache for type functions for the connection
     671            con.type_cache.reset_typecast()
     672            cur.execute('select 4.25')
     673            self.assertEqual(cur.description[0].type_code, pgdb.NUMBER)
     674            value = cur.fetchone()[0]
     675            # now the connection uses the new setting
    610676            self.assertTrue(isinstance(value, float))
    611677            self.assertEqual(value, 4.25)
     
    614680            pgdb.decimal_type(decimal_type)
    615681        self.assertTrue(pgdb.decimal_type() is decimal_type)
     682
     683    def test_global_typecast(self):
     684        try:
     685            query = 'select 2::int2, 4::int4, 8::int8'
     686            self.assertIs(pgdb.get_typecast('int4'), int)
     687            cast_int = lambda v: 'int(%s)' % v
     688            pgdb.set_typecast('int4', cast_int)
     689            con = self._connect()
     690            try:
     691                i2, i4, i8 = con.cursor().execute(query).fetchone()
     692            finally:
     693                con.close()
     694            self.assertEqual(i2, 2)
     695            self.assertEqual(i4, 'int(4)')
     696            self.assertEqual(i8, 8)
     697            pgdb.set_typecast(['int2', 'int8'], cast_int)
     698            con = self._connect()
     699            try:
     700                i2, i4, i8 = con.cursor().execute(query).fetchone()
     701            finally:
     702                con.close()
     703            self.assertEqual(i2, 'int(2)')
     704            self.assertEqual(i4, 'int(4)')
     705            self.assertEqual(i8, 'int(8)')
     706            pgdb.reset_typecast('int4')
     707            con = self._connect()
     708            try:
     709                i2, i4, i8 = con.cursor().execute(query).fetchone()
     710            finally:
     711                con.close()
     712            self.assertEqual(i2, 'int(2)')
     713            self.assertEqual(i4, 4)
     714            self.assertEqual(i8, 'int(8)')
     715            pgdb.reset_typecast(['int2', 'int8'])
     716            con = self._connect()
     717            try:
     718                i2, i4, i8 = con.cursor().execute(query).fetchone()
     719            finally:
     720                con.close()
     721            self.assertEqual(i2, 2)
     722            self.assertEqual(i4, 4)
     723            self.assertEqual(i8, 8)
     724            pgdb.set_typecast(['int2', 'int8'], cast_int)
     725            con = self._connect()
     726            try:
     727                i2, i4, i8 = con.cursor().execute(query).fetchone()
     728            finally:
     729                con.close()
     730            self.assertEqual(i2, 'int(2)')
     731            self.assertEqual(i4, 4)
     732            self.assertEqual(i8, 'int(8)')
     733        finally:
     734            pgdb.reset_typecast()
     735        con = self._connect()
     736        try:
     737            i2, i4, i8 = con.cursor().execute(query).fetchone()
     738        finally:
     739            con.close()
     740        self.assertEqual(i2, 2)
     741        self.assertEqual(i4, 4)
     742        self.assertEqual(i8, 8)
    616743
    617744    def test_unicode_with_utf8(self):
Note: See TracChangeset for help on using the changeset viewer.