Changeset 693 for trunk


Ignore:
Timestamp:
Jan 4, 2016, 5:51:09 PM (4 years ago)
Author:
cito
Message:

Support copy_from() and copy_to() in pgdb

Location:
trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/docs/changelog.rst

    r692 r693  
    1717- The DB-API 2 module now supports the callproc() cursor method. Note
    1818  that output parameters are currently not replaced in the return value.
     19- The DB-API 2 module no supports copy operations between data streams
     20  on the client and database tables via the COPY command of PostgreSQL.
     21  The cursor method copy_from() can be used to copy data from the database
     22  to the client, and the cursor method copy_to() can be used to copy data
     23  from the client to the database.
    1924- The 7-tuples returned by the description attribute of a pgdb cursor
    2025  are now named tuples, i.e. their elements can be also accessed by name.
  • trunk/docs/pgdb.rst

    r692 r693  
    268268    :meth:`Cursor.execute` or :meth:`Cursor.executemany` call produced
    269269    (for DQL statements like SELECT) or affected (for DML statements like
    270     UPDATE or INSERT ). The attribute is -1 in case no such method call has
    271     been performed on the cursor or the rowcount of the last operation
    272     cannot be determined by the interface.
     270    UPDATE or INSERT). It is also set by the :meth:`Cursor.copy_from` and
     271    :meth':`Cursor.copy_to` methods. The attribute is -1 in case no such
     272    method call has been performed on the cursor or the rowcount of the
     273    last operation cannot be determined by the interface.
    273274
    274275close -- close the cursor
     
    436437
    437438   The following methods and attributes are not part of the DB-API 2 standard.
     439
     440.. method:: Cursor.copy_from(stream, table, [format], [sep], [null], [size], [columns])
     441
     442        Copy data from an input stream to the specified table
     443
     444    :param stream: the input stream
     445        (must be a file-like object, a string or an iterable returning strings)
     446    :param str table: the name of a database table
     447    :param str format: the format of the data in the input stream,
     448        can be ``'text'`` (the default), ``'csv'``, or ``'binary'``
     449    :param str sep: a single character separator
     450        (the default is ``'\t'`` for text and ``','`` for csv)
     451    :param str null: the textual representation of the ``NULL`` value,
     452        can also be an empty string (the default is ``'\\N'``)
     453    :param int size: the size of the buffer when reading file-like objects
     454    :param list column: an optional list of column names
     455    :returns: the cursor, so you can chain commands
     456
     457    :raises TypeError: parameters with wrong types
     458    :raises ValueError: invalid parameters
     459    :raises IOError: error when executing the copy operation
     460
     461This method can be used to copy data from an input stream on the client side
     462to a database table on the server side using the ``COPY FROM`` command.
     463The input stream can be provided in form of a file-like object (which must
     464have a ``read()`` method), a string, or an iterable returning one row or
     465multiple rows of input data on each iteration.
     466
     467The format must be text, csv or binary. The sep option sets the column
     468separator (delimiter) used in the non binary formats. The null option sets
     469the textual representation of ``NULL`` in the input.
     470
     471The size option sets the size of the buffer used when reading data from
     472file-like objects.
     473
     474The copy operation can be restricted to a subset of columns. If no columns are
     475specified, all of them will be copied.
     476
     477.. method:: Cursor.copy_to(stream, table, [format], [sep], [null], [decode], [columns])
     478
     479        Copy data from the specified table to an output stream
     480
     481    :param stream: the output stream (must be a file-like object or ``None``)
     482    :param str table: the name of a database table or a ``SELECT`` query
     483    :param str format: the format of the data in the input stream,
     484        can be ``'text'`` (the default), ``'csv'``, or ``'binary'``
     485    :param str sep: a single character separator
     486        (the default is ``'\t'`` for text and ``','`` for csv)
     487    :param str null: the textual representation of the ``NULL`` value,
     488        can also be an empty string (the default is ``'\\N'``)
     489    :param bool decode: whether decoded strings shall be returned
     490        for non-binary formats (the default is True in Python 3)
     491    :param list column: an optional list of column names
     492    :returns: a generator if stream is set to ``None``, otherwise the cursor
     493
     494    :raises TypeError: parameters with wrong types
     495    :raises ValueError: invalid parameters
     496    :raises IOError: error when executing the copy operation
     497
     498This method can be used to copy data from a database table on the server side
     499to an output stream on the client side using the ``COPY TO`` command.
     500
     501The output stream can be provided in form of a file-like object (which must
     502have a ``write()`` method). Alternatively, if ``None`` is passed as the
     503output stream, the method will return a generator yielding one row of output
     504data on each iteration.
     505
     506Output will be returned as byte strings unless you set decode to true.
     507
     508Note that you can also use a ``SELECT`` query instead of the table name.
     509
     510The format must be text, csv or binary. The sep option sets the column
     511separator (delimiter) used in the non binary formats. The null option sets
     512the textual representation of ``NULL`` in the output.
     513
     514The copy operation can be restricted to a subset of columns. If no columns are
     515specified, all of them will be copied.
    438516
    439517.. method:: Cursor.row_factory(row)
  • trunk/module/pgdb.py

    r692 r693  
    9090    basestring = (str, bytes)
    9191
     92from collections import Iterable
    9293try:
    9394    from collections import OrderedDict
     
    433434        return parameters
    434435
     436    def copy_from(self, stream, table,
     437            format=None, sep=None, null=None, size=None, columns=None):
     438        """Copy data from an input stream to the specified table.
     439
     440        The input stream can be a file-like object with a read() method or
     441        it can also be an iterable returning a row or multiple rows of input
     442        on each iteration.
     443
     444        The format must be text, csv or binary. The sep option sets the
     445        column separator (delimiter) used in the non binary formats.
     446        The null option sets the textual representation of NULL in the input.
     447
     448        The size option sets the size of the buffer used when reading data
     449        from file-like objects.
     450
     451        The copy operation can be restricted to a subset of columns. If no
     452        columns are specified, all of them will be copied.
     453
     454        """
     455        binary_format = format == 'binary'
     456        try:
     457            read = stream.read
     458        except AttributeError:
     459            if size:
     460                raise ValueError("size must only be set for file-like objects")
     461            if binary_format:
     462                input_type = bytes
     463                type_name = 'byte strings'
     464            else:
     465                input_type = basestring
     466                type_name = 'strings'
     467
     468            if isinstance(stream, basestring):
     469                if not isinstance(stream, input_type):
     470                    raise ValueError("the input must be %s" % type_name)
     471                if not binary_format:
     472                    if isinstance(stream, str):
     473                        if not stream.endswith('\n'):
     474                            stream += '\n'
     475                    else:
     476                        if not stream.endswith(b'\n'):
     477                            stream += b'\n'
     478
     479                def chunks():
     480                    yield stream
     481
     482            elif isinstance(stream, Iterable):
     483
     484                def chunks():
     485                    for chunk in stream:
     486                        if not isinstance(chunk, input_type):
     487                            raise ValueError(
     488                                "input stream must consist of %s" % type_name)
     489                        if isinstance(chunk, str):
     490                            if not chunk.endswith('\n'):
     491                                chunk += '\n'
     492                        else:
     493                            if not chunk.endswith(b'\n'):
     494                                chunk += b'\n'
     495                        yield chunk
     496
     497            else:
     498                raise TypeError("need an input stream to copy from")
     499        else:
     500            if size is None:
     501                size = 8192
     502            if size > 0:
     503                if not isinstance(size, int):
     504                    raise TypeError("the size option must be an integer")
     505
     506                def chunks():
     507                    while True:
     508                        buffer = read(size)
     509                        yield buffer
     510                        if not buffer or len(buffer) < size:
     511                            break
     512
     513            else:
     514
     515                def chunks():
     516                    yield read()
     517
     518        if not table or not isinstance(table, basestring):
     519            raise TypeError("need a table to copy to")
     520        if table.lower().startswith('select'):
     521                raise ValueError("must specify a table, not a query")
     522        else:
     523            table = '"%s"' % (table,)
     524        operation = ['copy %s' % (table,)]
     525        options = []
     526        params = []
     527        if format is not None:
     528            if not isinstance(format, basestring):
     529                raise TypeError("the format options be a string")
     530            if format not in ('text', 'csv', 'binary'):
     531                raise ValueError("invalid format")
     532            options.append('format %s' % (format,))
     533        if sep is not None:
     534            if not isinstance(sep, basestring):
     535                raise TypeError("the sep option must be a string")
     536            if format == 'binary':
     537                raise ValueError("sep is not allowed with binary format")
     538            if len(sep) != 1:
     539                raise ValueError("sep must be a single one-byte character")
     540            options.append('delimiter %s')
     541            params.append(sep)
     542        if null is not None:
     543            if not isinstance(null, basestring):
     544                raise TypeError("the null option must be a string")
     545            options.append('null %s')
     546            params.append(null)
     547        if columns:
     548            if not isinstance(columns, basestring):
     549                columns = ','.join('"%s"' % (col,) for col in columns)
     550            operation.append('(%s)' % (columns,))
     551        operation.append("from stdin")
     552        if options:
     553            operation.append('(%s)' % ','.join(options))
     554        operation = ' '.join(operation)
     555
     556        putdata = self._src.putdata
     557        self.execute(operation, params)
     558
     559        try:
     560            for chunk in chunks():
     561                putdata(chunk)
     562        except BaseException as error:
     563            self.rowcount = -1
     564            # the following call will re-raise the error
     565            putdata(error)
     566        else:
     567            self.rowcount = putdata(None)
     568
     569        # return the cursor object, so you can chain operations
     570        return self
     571
     572    def copy_to(self, stream, table,
     573            format=None, sep=None, null=None, decode=None, columns=None):
     574        """Copy data from the specified table to an output stream.
     575
     576        The output stream can be a file-like object with a write() method or
     577        it can also be None, in which case the method will return a generator
     578        yielding a row on each iteration.
     579
     580        Output will be returned as byte strings unless you set decode to true.
     581
     582        Note that you can also use a select query instead of the table name.
     583
     584        The format must be text, csv or binary. The sep option sets the
     585        column separator (delimiter) used in the non binary formats.
     586        The null option sets the textual representation of NULL in the output.
     587
     588        The copy operation can be restricted to a subset of columns. If no
     589        columns are specified, all of them will be copied.
     590
     591        """
     592        binary_format = format == 'binary'
     593        if stream is not None:
     594            try:
     595                write = stream.write
     596            except AttributeError:
     597                raise TypeError("need an output stream to copy to")
     598        if not table or not isinstance(table, basestring):
     599            raise TypeError("need a table to copy to")
     600        if table.lower().startswith('select'):
     601            if columns:
     602                raise ValueError("columns must be specified in the query")
     603            table = '(%s)' % (table,)
     604        else:
     605            table = '"%s"' % (table,)
     606        operation = ['copy %s' % (table,)]
     607        options = []
     608        params = []
     609        if format is not None:
     610            if not isinstance(format, basestring):
     611                raise TypeError("the format options be a string")
     612            if format not in ('text', 'csv', 'binary'):
     613                raise ValueError("invalid format")
     614            options.append('format %s' % (format,))
     615        if sep is not None:
     616            if not isinstance(sep, basestring):
     617                raise TypeError("the sep option must be a string")
     618            if binary_format:
     619                raise ValueError("sep is not allowed with binary format")
     620            if len(sep) != 1:
     621                raise ValueError("sep must be a single one-byte character")
     622            options.append('delimiter %s')
     623            params.append(sep)
     624        if null is not None:
     625            if not isinstance(null, basestring):
     626                raise TypeError("the null option must be a string")
     627            options.append('null %s')
     628            params.append(null)
     629        if decode is None:
     630            if format == 'binary':
     631                decode = False
     632            else:
     633                decode = str is unicode
     634        else:
     635            if not isinstance(decode, (int, bool)):
     636                raise TypeError("the decode option must be a boolean")
     637            if decode and binary_format:
     638                raise ValueError("decode is not allowed with binary format")
     639        if columns:
     640            if not isinstance(columns, basestring):
     641                columns = ','.join('"%s"' % (col,) for col in columns)
     642            operation.append('(%s)' % (columns,))
     643
     644        operation.append("to stdout")
     645        if options:
     646            operation.append('(%s)' % ','.join(options))
     647        operation = ' '.join(operation)
     648
     649        getdata = self._src.getdata
     650        self.execute(operation, params)
     651
     652        def copy():
     653            while True:
     654                row = getdata(decode)
     655                if isinstance(row, int):
     656                    if self.rowcount != row:
     657                        self.rowcount = row
     658                    break
     659                self.rowcount += 1
     660                yield row
     661
     662        if stream is None:
     663            # no input stream, return the generator
     664            return copy()
     665
     666        # write the rows to the file-like input stream
     667        for row in copy():
     668            write(row)
     669
     670        # return the cursor object, so you can chain operations
     671        return self
     672
    435673    def __next__(self):
    436674        """Return the next row (support for the iteration protocol)."""
  • trunk/module/pgmodule.c

    r680 r693  
    28602860}
    28612861
     2862/* put copy data */
     2863static char sourcePutData__doc__[] =
     2864"getdata(buffer) -- send data to server during copy from stdin.";
     2865
     2866static PyObject *
     2867sourcePutData(sourceObject *self, PyObject *args)
     2868{
     2869        PyObject   *buffer_obj; /* the buffer object that was passed in */
     2870        char       *buffer; /* the buffer as encoded string */
     2871        Py_ssize_t      nbytes; /* length of string */
     2872        char       *errormsg = NULL; /* error message */
     2873        int                     res; /* direct result of the operation */
     2874        PyObject   *ret; /* return value */
     2875
     2876        /* checks validity */
     2877        if (!check_source_obj(self, CHECK_CNX))
     2878                return NULL;
     2879
     2880        /* make sure that the connection object is valid */
     2881        if (!self->pgcnx->cnx)
     2882                return NULL;
     2883
     2884        if (!PyArg_ParseTuple(args, "O", &buffer_obj))
     2885                return NULL;
     2886
     2887        if (buffer_obj == Py_None) {
     2888                /* pass None for terminating the operation */
     2889                buffer = errormsg = NULL;
     2890                buffer_obj = NULL;
     2891        }
     2892        else if (PyBytes_Check(buffer_obj))
     2893        {
     2894                /* or pass a byte string */
     2895                PyBytes_AsStringAndSize(buffer_obj, &buffer, &nbytes);
     2896                buffer_obj = NULL;
     2897        }
     2898        else if (PyUnicode_Check(buffer_obj))
     2899        {
     2900                /* or pass a unicode string */
     2901                buffer_obj = get_encoded_string(
     2902                        buffer_obj, PQclientEncoding(self->pgcnx->cnx));
     2903                if (!buffer_obj) return NULL; /* pass the UnicodeEncodeError */
     2904                PyBytes_AsStringAndSize(buffer_obj, &buffer, &nbytes);
     2905        }
     2906        else if (PyErr_GivenExceptionMatches(buffer_obj, PyExc_BaseException))
     2907        {
     2908                /* or pass a Python exception for sending an error message */
     2909                buffer_obj = PyObject_Str(buffer_obj);
     2910                if (PyUnicode_Check(buffer_obj))
     2911                {
     2912                        PyObject *obj = buffer_obj;
     2913                        buffer_obj = get_encoded_string(
     2914                                obj, PQclientEncoding(self->pgcnx->cnx));
     2915                        Py_DECREF(obj);
     2916                        if (!buffer_obj) return NULL; /* pass the UnicodeEncodeError */
     2917                }
     2918                errormsg = PyBytes_AsString(buffer_obj);
     2919                buffer = NULL;
     2920        }
     2921        else
     2922        {
     2923                PyErr_SetString(PyExc_TypeError,
     2924                        "putdata() expects a buffer, None or an exception.");
     2925                return NULL;
     2926        }
     2927
     2928        /* checks validity */
     2929        if (!check_source_obj(self, CHECK_CNX | CHECK_RESULT) ||
     2930                        !self->pgcnx->cnx ||
     2931                        PQresultStatus(self->result) != PGRES_COPY_IN)
     2932        {
     2933                PyErr_SetString(PyExc_IOError,
     2934                        "connection is invalid or not in copy_in state.");
     2935                Py_XDECREF(buffer_obj);
     2936                return NULL;
     2937        }
     2938
     2939        if (buffer)
     2940        {
     2941                res = nbytes ? PQputCopyData(self->pgcnx->cnx, buffer, nbytes) : 1;
     2942        }
     2943        else
     2944        {
     2945                res = PQputCopyEnd(self->pgcnx->cnx, errormsg);
     2946        }
     2947
     2948        Py_XDECREF(buffer_obj);
     2949
     2950        if (res != 1)
     2951        {
     2952                PyErr_SetString(PyExc_IOError, PQerrorMessage(self->pgcnx->cnx));
     2953                return NULL;
     2954        }
     2955
     2956        if (buffer) /* buffer has been sent */
     2957        {
     2958                ret = Py_None;
     2959                Py_INCREF(ret);
     2960        }
     2961        else /* copy is done */
     2962        {
     2963                PGresult   *result; /* final result of the operation */
     2964
     2965                Py_BEGIN_ALLOW_THREADS;
     2966                result = PQgetResult(self->pgcnx->cnx);
     2967                Py_END_ALLOW_THREADS;
     2968
     2969                if (PQresultStatus(result) == PGRES_COMMAND_OK)
     2970                {
     2971                        char   *temp;
     2972                        long    num_rows;
     2973
     2974                        temp = PQcmdTuples(result);
     2975                        num_rows = temp[0] ? atol(temp) : -1;
     2976                        ret = PyInt_FromLong(num_rows);
     2977                }
     2978                else
     2979                {
     2980                        if (!errormsg) errormsg = PQerrorMessage(self->pgcnx->cnx);
     2981                        PyErr_SetString(PyExc_IOError, errormsg);
     2982                        ret = NULL;
     2983                }
     2984
     2985                PQclear(self->result);
     2986                self->result = NULL;
     2987                self->result_type = RESULT_EMPTY;
     2988        }
     2989
     2990        return ret; /* None or number of rows */
     2991}
     2992
     2993/* get copy data */
     2994static char sourceGetData__doc__[] =
     2995"getdata(decode) -- receive data to server during copy to stdout.";
     2996
     2997static PyObject *
     2998sourceGetData(sourceObject *self, PyObject *args)
     2999{
     3000        int                *decode = 0; /* decode flag */
     3001        char       *buffer; /* the copied buffer as encoded byte string */
     3002        Py_ssize_t      nbytes; /* length of the byte string */
     3003        PyObject   *ret; /* return value */
     3004
     3005        /* checks validity */
     3006        if (!check_source_obj(self, CHECK_CNX))
     3007                return NULL;
     3008
     3009        /* make sure that the connection object is valid */
     3010        if (!self->pgcnx->cnx)
     3011                return NULL;
     3012
     3013        if (!PyArg_ParseTuple(args, "|i", &decode))
     3014                return NULL;
     3015
     3016        /* checks validity */
     3017        if (!check_source_obj(self, CHECK_CNX | CHECK_RESULT) ||
     3018                        !self->pgcnx->cnx ||
     3019                        PQresultStatus(self->result) != PGRES_COPY_OUT)
     3020        {
     3021                PyErr_SetString(PyExc_IOError,
     3022                        "connection is invalid or not in copy_out state.");
     3023                return NULL;
     3024        }
     3025
     3026        nbytes = PQgetCopyData(self->pgcnx->cnx, &buffer, 0);
     3027
     3028        if (!nbytes || nbytes < -1) /* an error occurred */
     3029        {
     3030                PyErr_SetString(PyExc_IOError, PQerrorMessage(self->pgcnx->cnx));
     3031                return NULL;
     3032        }
     3033
     3034        if (nbytes == -1) /* copy is done */
     3035        {
     3036                PGresult   *result; /* final result of the operation */
     3037
     3038                Py_BEGIN_ALLOW_THREADS;
     3039                result = PQgetResult(self->pgcnx->cnx);
     3040                Py_END_ALLOW_THREADS;
     3041
     3042                if (PQresultStatus(result) == PGRES_COMMAND_OK)
     3043                {
     3044                        char   *temp;
     3045                        long    num_rows;
     3046
     3047                        temp = PQcmdTuples(result);
     3048                        num_rows = temp[0] ? atol(temp) : -1;
     3049                        ret = PyInt_FromLong(num_rows);
     3050                }
     3051                else
     3052                {
     3053                        PyErr_SetString(PyExc_IOError, PQerrorMessage(self->pgcnx->cnx));
     3054                        ret = NULL;
     3055                }
     3056
     3057                PQclear(self->result);
     3058                self->result = NULL;
     3059                self->result_type = RESULT_EMPTY;
     3060        }
     3061        else /* a row has been returned */
     3062        {
     3063                ret = decode ? get_decoded_string(
     3064                                buffer, nbytes, PQclientEncoding(self->pgcnx->cnx)) :
     3065                        PyBytes_FromStringAndSize(buffer, nbytes);
     3066                PQfreemem(buffer);
     3067        }
     3068
     3069        return ret; /* buffer or number of rows */
     3070}
     3071
    28623072/* finds field number from string/integer (internal use only) */
    28633073static int
     
    30423252        {"moveprev", (PyCFunction) sourceMovePrev, METH_VARARGS,
    30433253                        sourceMovePrev__doc__},
     3254        {"putdata", (PyCFunction) sourcePutData, METH_VARARGS,
     3255                        sourcePutData__doc__},
     3256        {"getdata", (PyCFunction) sourceGetData, METH_VARARGS,
     3257                        sourceGetData__doc__},
    30443258        {"field", (PyCFunction) sourceField, METH_VARARGS,
    30453259                        sourceField__doc__},
Note: See TracChangeset for help on using the changeset viewer.