Changeset 628


Ignore:
Timestamp:
Nov 25, 2015, 12:48:37 PM (4 years ago)
Author:
cito
Message:

Accept non-ascii queries passed as unicode

So far, queries needed to be properly encoded. Now the query method
can do this automatically if it gets a query passed in as unicode.

Also, added missing code and tests for retrieving unicode values
with dictresult() under Python 3.

Location:
trunk/module
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/module/TEST_PyGreSQL_classic_connection.py

    r627 r628  
    542542
    543543
     544class TestUnicodeQueries(unittest.TestCase):
     545    """"Test unicode strings as queries via a basic pg connection."""
     546
     547    def setUp(self):
     548        self.c = connect()
     549        self.c.query('set client_encoding=utf8')
     550
     551    def tearDown(self):
     552        self.c.close()
     553
     554    def testGetresulAscii(self):
     555        result = u'Hello, world!'
     556        q = u"select '%s'" % result
     557        v = self.c.query(q).getresult()[0][0]
     558        self.assertIsInstance(v, str)
     559        self.assertEqual(v, result)
     560
     561    def testDictresulAscii(self):
     562        result = u'Hello, world!'
     563        q = u"select '%s' as greeting" % result
     564        v = self.c.query(q).dictresult()[0]['greeting']
     565        self.assertIsInstance(v, str)
     566        self.assertEqual(v, result)
     567
     568    def testGetresultUtf8(self):
     569        result = u'Hello, wörld & ЌОр!'
     570        q = u"select '%s'" % result
     571        if not unicode_strings:
     572            result = result.encode('utf8')
     573        # pass the query as unicode
     574        try:
     575            v = self.c.query(q).getresult()[0][0]
     576        except pg.ProgrammingError:
     577            self.skipTest("database does not support utf8")
     578        self.assertIsInstance(v, str)
     579        self.assertEqual(v, result)
     580        q = q.encode('utf8')
     581        # pass the query as bytes
     582        v = self.c.query(q).getresult()[0][0]
     583        self.assertIsInstance(v, str)
     584        self.assertEqual(v, result)
     585
     586    def testDictresultUtf8(self):
     587        result = u'Hello, wörld & ЌОр!'
     588        q = u"select '%s' as greeting" % result
     589        if not unicode_strings:
     590            result = result.encode('utf8')
     591        try:
     592            v = self.c.query(q).dictresult()[0]['greeting']
     593        except pg.ProgrammingError:
     594            self.skipTest("database does not support utf8")
     595        self.assertIsInstance(v, str)
     596        self.assertEqual(v, result)
     597        q = q.encode('utf8')
     598        v = self.c.query(q).dictresult()[0]['greeting']
     599        self.assertIsInstance(v, str)
     600        self.assertEqual(v, result)
     601
     602    def testDictresultLatin1(self):
     603        try:
     604            self.c.query('set client_encoding=latin1')
     605        except pg.ProgrammingError:
     606            self.skipTest("database does not support latin1")
     607        result = u'Hello, wörld!'
     608        q = u"select '%s'" % result
     609        if not unicode_strings:
     610            result = result.encode('latin1')
     611        v = self.c.query(q).getresult()[0][0]
     612        self.assertIsInstance(v, str)
     613        self.assertEqual(v, result)
     614        q = q.encode('latin1')
     615        v = self.c.query(q).getresult()[0][0]
     616        self.assertIsInstance(v, str)
     617        self.assertEqual(v, result)
     618
     619    def testDictresultLatin1(self):
     620        try:
     621            self.c.query('set client_encoding=latin1')
     622        except pg.ProgrammingError:
     623            self.skipTest("database does not support latin1")
     624        result = u'Hello, wörld!'
     625        q = u"select '%s' as greeting" % result
     626        if not unicode_strings:
     627            result = result.encode('latin1')
     628        v = self.c.query(q).dictresult()[0]['greeting']
     629        self.assertIsInstance(v, str)
     630        self.assertEqual(v, result)
     631        q = q.encode('latin1')
     632        v = self.c.query(q).dictresult()[0]['greeting']
     633        self.assertIsInstance(v, str)
     634        self.assertEqual(v, result)
     635
     636    def testGetresultCyrillic(self):
     637        try:
     638            self.c.query('set client_encoding=iso_8859_5')
     639        except pg.ProgrammingError:
     640            self.skipTest("database does not support cyrillic")
     641        result = u'Hello, ЌОр!'
     642        q = u"select '%s'" % result
     643        if not unicode_strings:
     644            result = result.encode('cyrillic')
     645        v = self.c.query(q).getresult()[0][0]
     646        self.assertIsInstance(v, str)
     647        self.assertEqual(v, result)
     648        q = q.encode('cyrillic')
     649        v = self.c.query(q).getresult()[0][0]
     650        self.assertIsInstance(v, str)
     651        self.assertEqual(v, result)
     652
     653    def testDictresultCyrillic(self):
     654        try:
     655            self.c.query('set client_encoding=iso_8859_5')
     656        except pg.ProgrammingError:
     657            self.skipTest("database does not support cyrillic")
     658        result = u'Hello, ЌОр!'
     659        q = u"select '%s' as greeting" % result
     660        if not unicode_strings:
     661            result = result.encode('cyrillic')
     662        v = self.c.query(q).dictresult()[0]['greeting']
     663        self.assertIsInstance(v, str)
     664        self.assertEqual(v, result)
     665        q = q.encode('cyrillic')
     666        v = self.c.query(q).dictresult()[0]['greeting']
     667        self.assertIsInstance(v, str)
     668        self.assertEqual(v, result)
     669
     670    def testGetresultLatin9(self):
     671        try:
     672            self.c.query('set client_encoding=latin9')
     673        except pg.ProgrammingError:
     674            self.skipTest("database does not support latin9")
     675        result = u'smœrebrœd with praÅŸská Å¡unka (pay in ¢, £, €, or Â¥)'
     676        q = u"select '%s'" % result
     677        if not unicode_strings:
     678            result = result.encode('latin9')
     679        v = self.c.query(q).getresult()[0][0]
     680        self.assertIsInstance(v, str)
     681        self.assertEqual(v, result)
     682        q = q.encode('latin9')
     683        v = self.c.query(q).getresult()[0][0]
     684        self.assertIsInstance(v, str)
     685        self.assertEqual(v, result)
     686
     687    def testDictresultLatin9(self):
     688        try:
     689            self.c.query('set client_encoding=latin9')
     690        except pg.ProgrammingError:
     691            self.skipTest("database does not support latin9")
     692        result = u'smœrebrœd with praÅŸská Å¡unka (pay in ¢, £, €, or Â¥)'
     693        q = u"select '%s' as menu" % result
     694        if not unicode_strings:
     695            result = result.encode('latin9')
     696        v = self.c.query(q).dictresult()[0]['menu']
     697        self.assertIsInstance(v, str)
     698        self.assertEqual(v, result)
     699        q = q.encode('latin9')
     700        v = self.c.query(q).dictresult()[0]['menu']
     701        self.assertIsInstance(v, str)
     702        self.assertEqual(v, result)
     703
     704
    544705class TestParamQueries(unittest.TestCase):
    545706    """"Test queries with parameters via a basic pg connection."""
     
    694855        self.assertEqual(self.c.query("select $1::text AS garbage", (garbage,)
    695856            ).dictresult(), [{'garbage': garbage}])
    696 
    697     def testUnicodeQuery(self):
    698         query = self.c.query
    699         self.assertEqual(query(u"select 1+1").getresult(), [(2,)])
    700         if unicode_strings:
    701             self.assertEqual(query("select 'Hello, wörld!'").getresult(),
    702                 [('Hello, wörld!',)])
    703         else:
    704             self.assertRaises(TypeError, query, u"select 'Hello, wörld!'")
    705857
    706858
  • trunk/module/pgmodule.c

    r619 r628  
    10741074connQuery(connObject *self, PyObject *args)
    10751075{
    1076         char            *query;
     1076        PyObject        *query_obj;
    10771077        PyObject        *oargs = NULL;
     1078        char            *query = NULL;
    10781079        PGresult        *result;
    10791080        queryObject *npgobj;
     
    10901091
    10911092        /* get query args */
    1092         if (!PyArg_ParseTuple(args, "s|O", &query, &oargs))
    1093         {
    1094                 PyErr_SetString(PyExc_TypeError, "query(sql, [args]), with sql (string).");
     1093        if (!PyArg_ParseTuple(args, "O|O", &query_obj, &oargs))
     1094        {
     1095                return NULL;
     1096        }
     1097
     1098        encoding = PQclientEncoding(self->cnx);
     1099        if (encoding != pg_encoding_utf8 && encoding != pg_encoding_latin1
     1100                        && encoding != pg_encoding_ascii)
     1101                /* should be translated to Python here */
     1102                encoding_name = pg_encoding_to_char(encoding);
     1103
     1104        if (PyBytes_Check(query_obj))
     1105        {
     1106                query = PyBytes_AsString(query_obj);
     1107        }
     1108        else if (PyUnicode_Check(query_obj))
     1109        {
     1110                if (encoding == pg_encoding_utf8)
     1111                        query_obj = PyUnicode_AsUTF8String(query_obj);
     1112                else if (encoding == pg_encoding_latin1)
     1113                        query_obj = PyUnicode_AsLatin1String(query_obj);
     1114                else if (encoding == pg_encoding_ascii)
     1115                        query_obj = PyUnicode_AsASCIIString(query_obj);
     1116                else
     1117                        query_obj = PyUnicode_AsEncodedString(query_obj,
     1118                                encoding_name, "strict");
     1119                if (!query_obj) return NULL; /* pass the UnicodeEncodeError */
     1120                query = PyBytes_AsString(query_obj);
     1121        }
     1122        if (!query) {
     1123                PyErr_SetString(PyExc_TypeError,
     1124                        "query command must be a string.");
    10951125                return NULL;
    10961126        }
     
    11111141                nparms = (int)PySequence_Size(oargs);
    11121142        }
    1113 
    1114         encoding = PQclientEncoding(self->cnx);
    1115         if (encoding != pg_encoding_utf8 && encoding != pg_encoding_latin1
    1116                         && encoding != pg_encoding_ascii)
    1117                 /* should be translated to Python here */
    1118                 encoding_name = pg_encoding_to_char(encoding);
    11191143
    11201144        /* gets result */
     
    32863310                                                /* convert to decimal only if decimal point is set */
    32873311                                                if (!decimal_point) goto default_case;
     3312
    32883313                                                for (k = 0;
    32893314                                                         *s && k < sizeof(cashbuf) / sizeof(cashbuf[0]) - 1;
     
    33773402                                n,
    33783403                           *typ;
     3404#if IS_PY3
     3405        int                     encoding;
     3406        const char *encoding_name=NULL;
     3407#endif
    33793408
    33803409        /* checks args (args == NULL for an internal call) */
     
    33853414                return NULL;
    33863415        }
     3416
     3417#if IS_PY3
     3418        encoding = self->encoding;
     3419        if (encoding != pg_encoding_utf8 && encoding != pg_encoding_latin1
     3420                        && encoding != pg_encoding_ascii)
     3421                /* should be translated to Python here */
     3422                encoding_name = pg_encoding_to_char(encoding);
     3423#endif
    33873424
    33883425        /* stores result in list */
     
    34353472                                                break;
    34363473
    3437                                         case 5:  /* pgmoney */
     3474                                        case 5:  /* money */
    34383475                                                /* convert to decimal only if decimal point is set */
    34393476                                                if (!decimal_point) goto default_case;
     
    34743511                                        default:
    34753512                                        default_case:
    3476                                                 val = PyStr_FromString(s);
     3513#if IS_PY3
     3514                                                if (encoding == pg_encoding_utf8)
     3515                                                        val = PyUnicode_DecodeUTF8(s, strlen(s), "strict");
     3516                                                else if (encoding == pg_encoding_latin1)
     3517                                                        val = PyUnicode_DecodeLatin1(s, strlen(s), "strict");
     3518                                                else if (encoding == pg_encoding_ascii)
     3519                                                        val = PyUnicode_DecodeASCII(s, strlen(s), "strict");
     3520                                                else
     3521                                                        val = PyUnicode_Decode(s, strlen(s),
     3522                                                                encoding_name, "strict");
     3523                                                if (!val)
     3524                                                        val = PyBytes_FromString(s);
     3525#else
     3526                                                val = PyBytes_FromString(s);
     3527#endif
    34773528                                                break;
    34783529                                }
Note: See TracChangeset for help on using the changeset viewer.