source: trunk/tests/test_classic_largeobj.py

Last change on this file was 969, checked in by cito, 3 months ago

Shebang should not be followed by a blank

It's a myth that it is needed because some old versions of Unix expect it.
However, some editors do not like a blank here.

  • Property svn:executable set to *
  • Property svn:keywords set to Id
File size: 14.5 KB
Line 
1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3
4"""Test the classic PyGreSQL interface.
5
6Sub-tests for large object support.
7
8Contributed by Christoph Zwerschke.
9
10These tests need a database to test against.
11"""
12
13try:
14    import unittest2 as unittest  # for Python < 2.7
15except ImportError:
16    import unittest
17import tempfile
18import os
19
20import pg  # the module under test
21
22# We need a database to test against.  If LOCAL_PyGreSQL.py exists we will
23# get our information from that.  Otherwise we use the defaults.
24dbname = 'unittest'
25dbhost = None
26dbport = 5432
27
28try:
29    from .LOCAL_PyGreSQL import *
30except (ImportError, ValueError):
31    try:
32        from LOCAL_PyGreSQL import *
33    except ImportError:
34        pass
35
36windows = os.name == 'nt'
37
38
39def connect():
40    """Create a basic pg connection to the test database."""
41    connection = pg.connect(dbname, dbhost, dbport)
42    connection.query("set client_min_messages=warning")
43    return connection
44
45
46class TestModuleConstants(unittest.TestCase):
47    """Test the existence of the documented module constants."""
48
49    def testLargeObjectIntConstants(self):
50        names = 'INV_READ INV_WRITE SEEK_SET SEEK_CUR SEEK_END'.split()
51        for name in names:
52            try:
53                value = getattr(pg, name)
54            except AttributeError:
55                self.fail('Module constant %s is missing' % name)
56            self.assertIsInstance(value, int)
57
58
59class TestCreatingLargeObjects(unittest.TestCase):
60    """Test creating large objects using a connection."""
61
62    def setUp(self):
63        self.c = connect()
64        self.c.query('begin')
65
66    def tearDown(self):
67        self.c.query('rollback')
68        self.c.close()
69
70    def assertIsLargeObject(self, obj):
71        self.assertIsNotNone(obj)
72        self.assertTrue(hasattr(obj, 'open'))
73        self.assertTrue(hasattr(obj, 'close'))
74        self.assertTrue(hasattr(obj, 'oid'))
75        self.assertTrue(hasattr(obj, 'pgcnx'))
76        self.assertTrue(hasattr(obj, 'error'))
77        self.assertIsInstance(obj.oid, int)
78        self.assertNotEqual(obj.oid, 0)
79        self.assertIs(obj.pgcnx, self.c)
80        self.assertIsInstance(obj.error, str)
81        self.assertFalse(obj.error)
82
83    def testLoCreate(self):
84        large_object = self.c.locreate(pg.INV_READ | pg.INV_WRITE)
85        try:
86            self.assertIsLargeObject(large_object)
87        finally:
88            del large_object
89
90    def testGetLo(self):
91        large_object = self.c.locreate(pg.INV_READ | pg.INV_WRITE)
92        try:
93            self.assertIsLargeObject(large_object)
94            oid = large_object.oid
95        finally:
96            del large_object
97        data = b'some data to be shared'
98        large_object = self.c.getlo(oid)
99        try:
100            self.assertIsLargeObject(large_object)
101            self.assertEqual(large_object.oid, oid)
102            large_object.open(pg.INV_WRITE)
103            large_object.write(data)
104            large_object.close()
105        finally:
106            del large_object
107        large_object = self.c.getlo(oid)
108        try:
109            self.assertIsLargeObject(large_object)
110            self.assertEqual(large_object.oid, oid)
111            large_object.open(pg.INV_READ)
112            r = large_object.read(80)
113            large_object.close()
114            large_object.unlink()
115        finally:
116            del large_object
117        self.assertIsInstance(r, bytes)
118        self.assertEqual(r, data)
119
120    def testLoImport(self):
121        if windows:
122            # NamedTemporaryFiles don't work well here
123            fname = 'temp_test_pg_largeobj_import.txt'
124            f = open(fname, 'wb')
125        else:
126            f = tempfile.NamedTemporaryFile()
127            fname = f.name
128        data = b'some data to be imported'
129        f.write(data)
130        if windows:
131            f.close()
132            f = open(fname, 'rb')
133        else:
134            f.flush()
135            f.seek(0)
136        large_object = self.c.loimport(f.name)
137        try:
138            f.close()
139            if windows:
140                os.remove(fname)
141            self.assertIsLargeObject(large_object)
142            large_object.open(pg.INV_READ)
143            large_object.seek(0, pg.SEEK_SET)
144            r = large_object.size()
145            self.assertIsInstance(r, int)
146            self.assertEqual(r, len(data))
147            r = large_object.read(80)
148            self.assertIsInstance(r, bytes)
149            self.assertEqual(r, data)
150            large_object.close()
151            large_object.unlink()
152        finally:
153            del large_object
154
155
156class TestLargeObjects(unittest.TestCase):
157    """Test the large object methods."""
158
159    def setUp(self):
160        self.pgcnx = connect()
161        self.pgcnx.query('begin')
162        self.obj = self.pgcnx.locreate(pg.INV_READ | pg.INV_WRITE)
163
164    def tearDown(self):
165        if self.obj.oid:
166            try:
167                self.obj.close()
168            except (SystemError, IOError):
169                pass
170            try:
171                self.obj.unlink()
172            except (SystemError, IOError):
173                pass
174        del self.obj
175        try:
176            self.pgcnx.query('rollback')
177        except SystemError:
178            pass
179        self.pgcnx.close()
180
181    def testClassName(self):
182        self.assertEqual(self.obj.__class__.__name__, 'LargeObject')
183
184    def testModuleName(self):
185        self.assertEqual(self.obj.__class__.__module__, 'pg')
186
187    def testOid(self):
188        self.assertIsInstance(self.obj.oid, int)
189        self.assertNotEqual(self.obj.oid, 0)
190
191    def testPgcn(self):
192        self.assertIs(self.obj.pgcnx, self.pgcnx)
193
194    def testError(self):
195        self.assertIsInstance(self.obj.error, str)
196        self.assertEqual(self.obj.error, '')
197
198    def testStr(self):
199        self.obj.open(pg.INV_WRITE)
200        data = b'some object to be printed'
201        self.obj.write(data)
202        oid = self.obj.oid
203        r = str(self.obj)
204        self.assertEqual(r, 'Opened large object, oid %d' % oid)
205        self.obj.close()
206        r = str(self.obj)
207        self.assertEqual(r, 'Closed large object, oid %d' % oid)
208
209    def testRepr(self):
210        r = repr(self.obj)
211        self.assertTrue(r.startswith('<pg.LargeObject object'), r)
212
213    def testOpen(self):
214        open = self.obj.open
215        # testing with invalid parameters
216        self.assertRaises(TypeError, open)
217        self.assertRaises(TypeError, open, pg.INV_READ, pg.INV_WRITE)
218        open(pg.INV_READ)
219        # object is already open
220        self.assertRaises(IOError, open, pg.INV_READ)
221
222    def testClose(self):
223        close = self.obj.close
224        # testing with invalid parameters
225        self.assertRaises(TypeError, close, pg.INV_READ)
226        # object is not yet open
227        self.assertRaises(IOError, close)
228        self.obj.open(pg.INV_READ)
229        close()
230        self.assertRaises(IOError, close)
231
232    def testRead(self):
233        read = self.obj.read
234        # testing with invalid parameters
235        self.assertRaises(TypeError, read)
236        self.assertRaises(ValueError, read, -1)
237        self.assertRaises(TypeError, read, 'invalid')
238        self.assertRaises(TypeError, read, 80, 'invalid')
239        # reading when object is not yet open
240        self.assertRaises(IOError, read, 80)
241        data = b'some data to be read'
242        self.obj.open(pg.INV_WRITE)
243        self.obj.write(data)
244        self.obj.close()
245        self.obj.open(pg.INV_READ)
246        r = read(80)
247        self.assertIsInstance(r, bytes)
248        self.assertEqual(r, data)
249        self.obj.close()
250        self.obj.open(pg.INV_READ)
251        r = read(8)
252        self.assertIsInstance(r, bytes)
253        self.assertEqual(r, data[:8])
254        self.obj.close()
255
256    def testWrite(self):
257        write = self.obj.write
258        # testing with invalid parameters
259        self.assertRaises(TypeError, write)
260        self.assertRaises(TypeError, write, -1)
261        self.assertRaises(TypeError, write, '', 'invalid')
262        # writing when object is not yet open
263        self.assertRaises(IOError, write, 'invalid')
264        data = b'some data to be written'
265        self.obj.open(pg.INV_WRITE)
266        write(data)
267        self.obj.close()
268        self.obj.open(pg.INV_READ)
269        r = self.obj.read(80)
270        self.assertIsInstance(r, bytes)
271        self.assertEqual(r, data)
272
273    def testWriteLatin1Bytes(self):
274        read = self.obj.read
275        self.obj.open(pg.INV_WRITE)
276        self.obj.write(u'kÀse'.encode('latin1'))
277        self.obj.close()
278        self.obj.open(pg.INV_READ)
279        r = read(80)
280        self.assertIsInstance(r, bytes)
281        self.assertEqual(r.decode('latin1'), u'kÀse')
282
283    def testWriteUtf8Bytes(self):
284        read = self.obj.read
285        self.obj.open(pg.INV_WRITE)
286        self.obj.write(u'kÀse'.encode('utf8'))
287        self.obj.close()
288        self.obj.open(pg.INV_READ)
289        r = read(80)
290        self.assertIsInstance(r, bytes)
291        self.assertEqual(r.decode('utf8'), u'kÀse')
292
293    def testWriteUtf8String(self):
294        read = self.obj.read
295        self.obj.open(pg.INV_WRITE)
296        self.obj.write('kÀse')
297        self.obj.close()
298        self.obj.open(pg.INV_READ)
299        r = read(80)
300        self.assertIsInstance(r, bytes)
301        self.assertEqual(r.decode('utf8'), u'kÀse')
302
303    def testSeek(self):
304        seek = self.obj.seek
305        # testing with invalid parameters
306        self.assertRaises(TypeError, seek)
307        self.assertRaises(TypeError, seek, 0)
308        self.assertRaises(TypeError, seek, 0, pg.SEEK_SET, pg.SEEK_END)
309        self.assertRaises(TypeError, seek, 'invalid', pg.SEEK_SET)
310        self.assertRaises(TypeError, seek, 0, 'invalid')
311        # seeking when object is not yet open
312        self.assertRaises(IOError, seek, 0, pg.SEEK_SET)
313        data = b'some data to be seeked'
314        self.obj.open(pg.INV_WRITE)
315        self.obj.write(data)
316        self.obj.close()
317        self.obj.open(pg.INV_READ)
318        seek(0, pg.SEEK_SET)
319        r = self.obj.read(9)
320        self.assertIsInstance(r, bytes)
321        self.assertEqual(r, b'some data')
322        seek(4, pg.SEEK_CUR)
323        r = self.obj.read(2)
324        self.assertIsInstance(r, bytes)
325        self.assertEqual(r, b'be')
326        seek(-10, pg.SEEK_CUR)
327        r = self.obj.read(4)
328        self.assertIsInstance(r, bytes)
329        self.assertEqual(r, b'data')
330        seek(0, pg.SEEK_SET)
331        r = self.obj.read(4)
332        self.assertIsInstance(r, bytes)
333        self.assertEqual(r, b'some')
334        seek(-6, pg.SEEK_END)
335        r = self.obj.read(4)
336        self.assertIsInstance(r, bytes)
337        self.assertEqual(r, b'seek')
338
339    def testTell(self):
340        tell = self.obj.tell
341        # testing with invalid parameters
342        self.assertRaises(TypeError, tell, 0)
343        # telling when object is not yet open
344        self.assertRaises(IOError, tell)
345        data = b'some story to be told'
346        self.obj.open(pg.INV_WRITE)
347        self.obj.write(data)
348        r = tell()
349        self.assertIsInstance(r, int)
350        self.assertEqual(r, len(data))
351        self.obj.close()
352        self.obj.open(pg.INV_READ)
353        r = tell()
354        self.assertIsInstance(r, int)
355        self.assertEqual(r, 0)
356        self.obj.seek(5, pg.SEEK_SET)
357        r = tell()
358        self.assertIsInstance(r, int)
359        self.assertEqual(r, 5)
360
361    def testUnlink(self):
362        unlink = self.obj.unlink
363        # testing with invalid parameters
364        self.assertRaises(TypeError, unlink, 0)
365        # unlinking when object is still open
366        self.obj.open(pg.INV_WRITE)
367        self.assertIsNotNone(self.obj.oid)
368        self.assertNotEqual(0, self.obj.oid)
369        self.assertRaises(IOError, unlink)
370        data = b'some data to be sold'
371        self.obj.write(data)
372        self.obj.close()
373        # unlinking after object has been closed
374        unlink()
375        self.assertIsNone(self.obj.oid)
376        # unlinking after object has been already unlinked
377        self.assertRaises(pg.IntegrityError, unlink)
378
379    def testUnlinkInexistent(self):
380        unlink = self.obj.unlink
381        self.obj.open(pg.INV_WRITE)
382        self.obj.close()
383        self.pgcnx.query('select lo_unlink(%d)' % self.obj.oid)
384        self.assertRaises(IOError, unlink)
385
386    def testSize(self):
387        size = self.obj.size
388        # testing with invalid parameters
389        self.assertRaises(TypeError, size, 0)
390        # sizing when object is not yet open
391        self.assertRaises(IOError, size)
392        # sizing an empty object
393        self.obj.open(pg.INV_READ)
394        r = size()
395        self.obj.close()
396        self.assertIsInstance(r, int)
397        self.assertEqual(r, 0)
398        # sizing after adding some data
399        data = b'some data to be sized'
400        self.obj.open(pg.INV_WRITE)
401        self.obj.write(data)
402        self.obj.close()
403        # sizing when current position is zero
404        self.obj.open(pg.INV_READ)
405        r = size()
406        self.obj.close()
407        self.assertIsInstance(r, int)
408        self.assertEqual(r, len(data))
409        self.obj.open(pg.INV_READ)
410        # sizing when current position is not zero
411        self.obj.seek(5, pg.SEEK_SET)
412        r = size()
413        self.obj.close()
414        self.assertIsInstance(r, int)
415        self.assertEqual(r, len(data))
416        # sizing after adding more data
417        data += b' and more data'
418        self.obj.open(pg.INV_WRITE)
419        self.obj.write(data)
420        self.obj.close()
421        self.obj.open(pg.INV_READ)
422        r = size()
423        self.obj.close()
424        self.assertIsInstance(r, int)
425        self.assertEqual(r, len(data))
426
427    def testExport(self):
428        export = self.obj.export
429        # testing with invalid parameters
430        self.assertRaises(TypeError, export)
431        self.assertRaises(TypeError, export, 0)
432        self.assertRaises(TypeError, export, 'invalid', 0)
433        if windows:
434            # NamedTemporaryFiles don't work well here
435            fname = 'temp_test_pg_largeobj_export.txt'
436            f = open(fname, 'wb')
437        else:
438            f = tempfile.NamedTemporaryFile()
439            fname = f.name
440        data = b'some data to be exported'
441        self.obj.open(pg.INV_WRITE)
442        self.obj.write(data)
443        # exporting when object is not yet closed
444        self.assertRaises(IOError, export, f.name)
445        self.obj.close()
446        export(fname)
447        if windows:
448            f.close()
449            f = open(fname, 'rb')
450        r = f.read()
451        f.close()
452        if windows:
453            os.remove(fname)
454        self.assertIsInstance(r, bytes)
455        self.assertEqual(r, data)
456
457    def testExportInExistent(self):
458        export = self.obj.export
459        f = tempfile.NamedTemporaryFile()
460        self.obj.open(pg.INV_WRITE)
461        self.obj.close()
462        self.pgcnx.query('select lo_unlink(%d)' % self.obj.oid)
463        self.assertRaises(IOError, export, f.name)
464        f.close()
465
466
467if __name__ == '__main__':
468    unittest.main()
Note: See TracBrowser for help on using the repository browser.