Core API Reference

Connection

The library provides a way to connect to PostgreSQL database.

Example:

async def go():
    conn = await aiopg.connect(database='aiopg',
                               user='aiopg',
                               password='secret',
                               host='127.0.0.1')
    cur = await conn.cursor()
    await cur.execute("SELECT * FROM tbl")
    ret = await cur.fetchall()
coroutine async-with aiopg.connect(dsn=None, *, timeout=60.0, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, **kwargs)[source]

Make a connection to PostgreSQL server.

The function accepts all parameters that psycopg2.connect() does plus optional keyword-only timeout parameter.

Parameters
  • timeout (float) –

    default timeout (in seconds) for connection operations.

    60 secs by default.

  • enable_json (bool) –

    enable json column types for connection.

    True by default.

  • enable_hstore (bool) –

    try to enable hstore column types for connection.

    True by default.

    For using HSTORE columns extension should be installed in database first:

    CREATE EXTENSION HSTORE
    

  • enable_uuid (bool) –

    enable uuid column types for connection.

    True by default.

  • echo (bool) – log executed SQL statement (False by default).

Returns

Connection instance.

class aiopg.Connection[source]

A connection to a PostgreSQL database instance. It encapsulates a database session.

Its insterface is very close to psycopg2.connection (http://initd.org/psycopg/docs/connection.html) except all methods are coroutines.

Use connect() for creating connection.

The most important method is

coroutine async-with cursor(name=None, cursor_factory=None, scrollable=None, withhold=False, *, timeout=None)[source]

Creates a new cursor object using the connection.

The only cursor_factory can be specified, all other parameters are not supported by psycopg2-binary in asynchronous mode yet.

The cursor_factory argument can be used to create non-standard cursors. The argument must be a subclass of psycopg2.extensions.cursor. See subclassing-cursor for details. A default factory for the connection can also be specified using the Connection.cursor_factory attribute.

timeout is a timeout for returned cursor instance if parameter is not None.

name, scrollable and withhold parameters are not supported by psycopg2-binary in asynchronous mode.

Returns

Cursor instance.

close()[source]

Immediatelly close the connection.

Close the connection now (rather than whenever del is executed). The connection will be unusable from this point forward; an psycopg2.InterfaceError will be raised if any operation is attempted with the connection. The same applies to all cursor objects trying to use the connection. Note that closing a connection without committing the changes first will cause any pending change to be discarded as if a ROLLBACK was performed.

Changed in version 0.5: close() is regular function now. For sake of backward compatibility the method returns asyncio.Future instance with result already set to None (you still can use await conn.close() construction.

closed

The readonly property that returns True if connections is closed.

free_cursor()

Call method Cursor.closed() for current instance Connection

closed_cursor

Return attribute Cursor.closed for current instance Connection.

echo

Return echo mode status. Log all executed queries to logger named aiopg if True

raw

The readonly property that underlying psycopg2.connection instance.

coroutine cancel()[source]

Changed in version 1.2.0: Not supported in asynchronous mode (psycopg2.ProgrammingError is raised).

dsn

The readonly property that returns dsn string used by the connection.

autocommit

Autocommit mode status for connection (always True).

Note

psycopg2-binary doesn’t allow to change autocommit mode in asynchronous mode.

encoding

Client encoding for SQL operations.

Note

psycopg2-binary doesn’t allow to change encoding in asynchronous mode.

isolation_level

Get the transaction isolation level for the current session.

Note

The only value allowed in asynchronous mode value is psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED (READ COMMITTED).

notices

A list containing all the database messages sent to the client during the session:

>>> await cur.execute("CREATE TABLE foo (id serial PRIMARY KEY);")
>>> pprint(conn.notices)
['NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"\n',
 'NOTICE:  CREATE TABLE will create implicit sequence "foo_id_seq" for serial column "foo.id"\n']

To avoid a leak in case excessive notices are generated, only the last 50 messages are kept.

You can configure what messages to receive using PostgreSQL logging configuration parameters such as log_statement, client_min_messages, log_min_duration_statement etc.

cursor_factory

The default cursor factory used by Connection.cursor() if the parameter is not specified.

get_backend_pid()[source]

Returns the process ID (PID) of the backend server process handling this connection.

Note that the PID belongs to a process executing on the database server host, not the local host!

See also

libpq docs for PQbackendPID() for details.

get_parameter_status(parameter)[source]

Look up a current parameter setting of the server.

Potential values for parameter are: server_version, server_encoding, client_encoding, is_superuser, session_authorization, DateStyle, TimeZone, integer_datetimes, and standard_conforming_strings.

If server did not report requested parameter, return None.

See also

libpq docs for PQparameterStatus() for details.

get_transaction_status()[source]

Return the current session transaction status as an integer. Symbolic constants for the values are defined in the module psycopg2.extensions: see transaction-status-constants for the available values.

See also

libpq docs for PQtransactionStatus() for details.

protocol_version

A read-only integer representing frontend/backend protocol being used. Currently Psycopg supports only protocol 3, which allows connection to PostgreSQL server from version 7.4. Psycopg versions previous than 2.3 support both protocols 2 and 3.

See also

libpq docs for PQprotocolVersion() for details.

server_version

A read-only integer representing the backend version.

The number is formed by converting the major, minor, and revision numbers into two-decimal-digit numbers and appending them together. For example, version 8.1.5 will be returned as 80105.

See also

libpq docs for PQserverVersion() for details.

status

A read-only integer representing the status of the connection. Symbolic constants for the values are defined in the module psycopg2.extensions: see connection-status-constants for the available values.

The status is undefined for closed connectons.

timeout

A read-only float representing default timeout for connection’s operations.

notifies

An instance of an asyncio.Queue subclass for received notifications.

The Connection class also has several methods not described here. Those methods are not supported in asynchronous mode (psycopg2.ProgrammingError is raised).

Cursor

class aiopg.Cursor[source]

A cursor for connection.

Allows Python code to execute PostgreSQL command in a database session. Cursors are created by the Connection.cursor() coroutine: they are bound to the connection for the entire lifetime and all the commands are executed in the context of the database session wrapped by the connection.

Cursors that are created from the same connection are not isolated, i.e., any changes done to the database by a cursor are immediately visible by the other cursors. Cursors created from different connections can or can not be isolated, depending on the connections’ isolation level.

Its insterface is very close to psycopg2.cursor (http://initd.org/psycopg/docs/cursor.html) except all methods are coroutines.

Use Connection.cursor() for getting cursor for connection.

echo

Return echo mode status. Log all executed queries to logger named aiopg if True

description

This read-only attribute is a sequence of 7-item sequences.

Each of these sequences is a collections.namedtuple() containing information describing one result column:

  1. name: the name of the column returned.

  2. type_code: the PostgreSQL OID of the column. You can use the pg_type system table to get more informations about the type. This is the value used by Psycopg to decide what Python type use to represent the value. See also type-casting-from-sql-to-python.

  3. display_size: the actual length of the column in bytes. Obtaining this value is computationally intensive, so it is always None unless the PSYCOPG_DISPLAY_SIZE parameter is set at compile time. See also PQgetlength.

  4. internal_size: the size in bytes of the column associated to this column on the server. Set to a negative value for variable-size types See also PQfsize.

  5. precision: total number of significant digits in columns of type NUMERIC. None for other types.

  6. scale: count of decimal digits in the fractional part in columns of type NUMERIC. None for other types.

  7. null_ok: always None as not easy to retrieve from the libpq.

This attribute will be None for operations that do not return rows or if the cursor has not had an operation invoked via the execute() method yet.

close()[source]

Close the cursor now (rather than whenever del is executed). The cursor will be unusable from this point forward; an psycopg2.InterfaceError will be raised if any operation is attempted with the cursor.

Note

close() is not a coroutine, you don’t need to wait it via await curs.close().

closed

Read-only boolean attribute: specifies if the cursor is closed (True) or not (False).

raw

The readonly property that underlying psycopg2.cursor instance.

connection

Read-only attribute returning a reference to the Connection object on which the cursor was created.

timeout

A read-only float representing default timeout for cursor’s operations.

coroutine execute(operation, parameters=None, *, timeout=None)[source]

Prepare and execute a database operation (query or command).

Parameters may be provided as sequence or mapping and will be bound to variables in the operation. Variables are specified either with positional (%s) or named (%(name)s) placeholders. See query-parameters.

Parameters

timeout (float) – overrides cursor’s timeout if not None.

Returns

None. If a query was executed, the returned values can be retrieved using fetch*() methods.

coroutine callproc(procname, parameters=None, *, timeout=None)[source]

Call a stored database procedure with the given name. The sequence of parameters must contain one entry for each argument that the procedure expects. The result of the call is returned as modified copy of the input sequence. Input parameters are left untouched, output and input/output parameters replaced with possibly new values.

The procedure may also provide a result set as output. This must then be made available through the standard fetch*() methods.

Parameters

timeout (float) – overrides cursor’s timeout if not None.

mogrify(operation, parameters=None)[source]

Returns a query string after arguments binding. The string returned is exactly the one that would be sent to the database running the Cursor.execute() method or similar.

The returned string is always a bytes string:

>>> cur.mogrify("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
"INSERT INTO test (num, data) VALUES (42, E'bar')"
setinputsizes(sizes)[source]

This method is exposed in compliance with the DBAPI. It currently does nothing but it is safe to call it.

Results retrieval methods

The following methods are used to read data from the database after an Cursor.execute() call.

Cursor object supports asynchronous iteration starting from Python 3.5:

await cursor.execute('SELECT key, value FROM tbl;')
async for key, value in cursor:
    ...

Warning

Cursor objects do not support regular iteration (using for statement) since version 0.7.

Iterable protocol in Cursor hides await from user, witch should be explicit. Moreover iteration support is optional, according to PEP-249 (https://www.python.org/dev/peps/pep-0249/#iter).

coroutine fetchone()[source]

Fetch the next row of a query result set, returning a single tuple, or None when no more data is available:

>>> await cur.execute("SELECT * FROM test WHERE id = %s", (3,))
>>> await cur.fetchone()
(3, 42, 'bar')

A psycopg2.ProgrammingError is raised if the previous call to execute() did not produce any result set or no call was issued yet.

coroutine fetchmany(size=cursor.arraysize)[source]

Fetch the next set of rows of a query result, returning a list of tuples. An empty list is returned when no more rows are available.

The number of rows to fetch per call is specified by the parameter. If it is not given, the cursor’s Cursor.arraysize determines the number of rows to be fetched. The method should try to fetch as many rows as indicated by the size parameter. If this is not possible due to the specified number of rows not being available, fewer rows may be returned:

>>> await cur.execute("SELECT * FROM test;")
>>> await cur.fetchmany(2)
[(1, 100, "abc'def"), (2, None, 'dada')]
>>> await cur.fetchmany(2)
[(3, 42, 'bar')]
>>> await cur.fetchmany(2)
[]

A psycopg2.ProgrammingError is raised if the previous call to execute() did not produce any result set or no call was issued yet.

Note there are performance considerations involved with the size parameter. For optimal performance, it is usually best to use the Cursor.arraysize attribute. If the size parameter is used, then it is best for it to retain the same value from one fetchmany() call to the next.

coroutine fetchall()[source]

Fetch all (remaining) rows of a query result, returning them as a list of tuples. An empty list is returned if there is no more record to fetch:

>>> await cur.execute("SELECT * FROM test;")
>>> await cur.fetchall()
[(1, 100, "abc'def"), (2, None, 'dada'), (3, 42, 'bar')]

A psycopg2.ProgrammingError is raised if the previous call to execute() did not produce any result set or no call was issued yet.

coroutine scroll(value, mode='relative')[source]

Scroll the cursor in the result set to a new position according to mode.

If mode is relative (default), value is taken as offset to the current position in the result set, if set to absolute, value states an absolute target position.

If the scroll operation would leave the result set, a psycopg2.ProgrammingError is raised and the cursor position is not changed.

Note

According to the DBAPI, the exception raised for a cursor out of bound should have been IndexError. The best option is probably to catch both exceptions in your code:

try:
    await cur.scroll(1000 * 1000)
except (ProgrammingError, IndexError), exc:
    deal_with_it(exc)
arraysize

This read/write attribute specifies the number of rows to fetch at a time with Cursor.fetchmany(). It defaults to 1 meaning to fetch a single row at a time.

rowcount

This read-only attribute specifies the number of rows that the last execute() produced (for DQL statements like SELECT) or affected (for DML statements like UPDATE or INSERT).

The attribute is -1 in case no execute() has been performed on the cursor or the row count of the last operation if it can’t be determined by the interface.

Note

The DBAPI interface reserves to redefine the latter case to have the object return None instead of -1 in future versions of the specification.

rownumber

This read-only attribute provides the current 0-based index of the cursor in the result set or None if the index cannot be determined.

The index can be seen as index of the cursor in a sequence (the result set). The next fetch operation will fetch the row indexed by rownumber in that sequence.

lastrowid

This read-only attribute provides the OID of the last row inserted by the cursor. If the table wasn’t created with OID support or the last operation is not a single record insert, the attribute is set to None.

Note

PostgreSQL currently advices to not create OIDs on the tables and the default for CREATE TABLE is to not support them. The INSERT ... RETURNING syntax available from PostgreSQL 8.3 allows more flexibility.

query

Read-only attribute containing the body of the last query sent to the backend (including bound arguments) as bytes string. None if no query has been executed yet:

>>> await cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
>>> cur.query
"INSERT INTO test (num, data) VALUES (42, E'bar')"
statusmessage

Read-only attribute containing the message returned by the last command:

>>> await cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
>>> cur.statusmessage
'INSERT 0 1'
tzinfo_factory

The time zone factory used to handle data types such as TIMESTAMP WITH TIME ZONE. It should be a datetime.tzinfo object. A few implementations are available in the psycopg2.tz module.

setoutputsize(size, column=None)[source]

This method is exposed in compliance with the DBAPI. It currently does nothing but it is safe to call it.

coroutine begin()[source]

Begin a transaction and return a transaction handle. The returned object is an instance of _TransactionBeginContextManager.:

async def begin(engine):
   async with engine.cursor() as cur:
       async with cur.begin():
           await cur.execute("insert into tbl values(1, 'data')")

       async with cur.begin():
           await cur.execute('select * from tbl')
           row = await cur.fetchall()
           assert row == [(22, 'read only'), (1, 'data'), ]
coroutine begin_nested()[source]

Begin a nested transaction and return a transaction handle.

The returned object is an instance of _TransactionBeginContextManager.

Any transaction in the hierarchy may commit and rollback, however the outermost transaction still controls the overall commit or rollback of the transaction of a whole. It utilizes SAVEPOINT facility of PostgreSQL server:

async def begin_nested(engine):
    async with engine.cursor() as cur:
        async with cur.begin_nested():
            await cur.execute("insert into tbl values(1, 'data')")

                try:
                    async with cur.begin_nested():
                        await cur.execute("insert into tbl values(1/0, 'no data')")
                except psycopg2.DataError:
                        pass

        async with cur.begin_nested():
            await cur.execute("insert into tbl values(2, 'data')")

        async with cur.begin_nested():
            await cur.execute('select * from tbl')
            row = await cur.fetchall()
            assert row == [(22, 'read only'), (1, 'data'),  (2, 'data'), ]

Pool

The library provides connection pool as well as plain Connection objects.

The basic usage is:

import asyncio
import aiopg

dsn = 'dbname=jetty user=nick password=1234 host=localhost port=5432'


async def test_select():
    pool = await aiopg.create_pool(dsn)

    with (await pool.cursor()) as cur:
        await cur.execute('SELECT 1')
        ret = await cur.fetchone()
        assert ret == (1,), ret
coroutine async-with aiopg.create_pool(dsn=None, *, minsize=1, maxsize=10, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, timeout=60.0, **kwargs)[source]

Create a pool of connections to PostgreSQL database.

The function accepts all parameters that psycopg2.connect() does plus optional keyword-only parameters minsize, maxsize.

Parameters
  • minsize (int) – minimum size of the pool, 1 by default.

  • maxsize (int) – maximum sizes of the pool, 10 by default. 0 means unlimited pool size.

  • timeout (float) – a default timeout (in seconds) for connection operations. 60 secs by default.

  • enable_json (bool) – enable json column types for connections created by the pool, enabled by default.

  • enable_hstore (bool) –

    enable hstore column types for connections created by the pool, enabled by default.

    For using HSTORE columns extension should be installed in database first:

    CREATE EXTENSION HSTORE
    

  • enable_uuid (bool) – enable UUID column types for connections created by the pool, enabled by default.

  • echo (bool) – executed log SQL queryes (disabled by default).

  • on_connect – a callback coroutine executed once for every created connection. May be used for setting up connection level state like client encoding etc.

  • pool_recycle (float) – number of seconds after which connection is recycled, helps to deal with stale connections in pool, default value is -1, means recycling logic is disabled.

Returns

Pool instance.

class aiopg.Pool[source]

A connection pool.

After creation pool has minsize free connections and can grow up to maxsize ones.

If minsize is 0 the pool doesn’t creates any connection on startup.

If maxsize is 0 than size of pool is unlimited (but it recycles used connections of course).

The most important way to use it is getting connection in with statement:

async with pool as conn:
    cur = await conn.cursor()

and shortcut for getting cursor directly:

async with pool.cursor() as cur:
    await cur.execute('SELECT 1')

See also Pool.acquire() and Pool.release() for acquiring connection without with statement.

echo

Return echo mode status. Log all executed queries to logger named aiopg if True

minsize

A minimal size of the pool (read-only), 1 by default.

maxsize

A maximal size of the pool (read-only), 10 by default.

size

A current size of the pool (readonly). Includes used and free connections.

freesize

A count of free connections in the pool (readonly).

timeout

A read-only float representing default timeout for operations for connections from pool.

classmethod coroutine from_pool_fill(*args, **kwargs)[source]

The method is a coroutine. Constructor for filling the free pool with connections, the number is controlled by the minsize parameter

clear()[source]

A coroutine that closes all free connections in the pool. At next connection acquiring at least minsize of them will be recreated.

close()[source]

Close pool.

Mark all pool connections to be closed on getting back to pool. Closed pool doesn’t allow to acquire new connections.

If you want to wait for actual closing of acquired connection please call wait_closed() after close().

Warning

The method is not a coroutine.

terminate()[source]

Terminate pool.

Close pool with instantly closing all acquired connections also.

wait_closed() should be called after terminate() for waiting for actual finishing.

Warning

The method is not a coroutine.

coroutine wait_closed()[source]

Wait for releasing and closing all acquired connections.

Should be called after close() for waiting for actual pool closing.

coroutine async-with acquire()[source]

Acquire a connection from free pool. Create a new connection if needed and size of pool is less than maxsize.

Returns a Connection instance.

Warning

nested acquire() might lead to deadlocks.

release(conn)[source]

A coroutine that reverts connection conn to free pool for future recycling.

Changed in version 0.10: The method is converted into a coroutine to get exception context in case of errors.

The change is backward compatible though since technically it’s a regular method returning a future instance.

cursor(name=None, cursor_factory=None, scrollable=None, withhold=False, *, timeout=None)[source]

A coroutine that acquires a connection and returns context manager.

The only cursor_factory can be specified, all other parameters are not supported by psycopg2-binary in asynchronous mode yet.

The cursor_factory argument can be used to create non-standard cursors. The argument must be a subclass of psycopg2.extensions.cursor. See subclassing-cursor for details. A default factory for the connection can also be specified using the Connection.cursor_factory attribute.

timeout is a timeout for returned cursor instance if parameter is not None.

name, scrollable and withhold parameters are not supported by psycopg2-binary in asynchronous mode.

The usage is:

async with pool.cursor() as cur:
    await cur.execute('SELECT 1')

After exiting from with block cursor cur will be closed.

Exceptions

Any call to library function, method or property can raise an exception.

aiopg doesn’t define any exception class itself, it reuses DBAPI Exceptions from psycopg2-binary

Transactions

While psycopg2-binary asynchronous connections have to be in autocommit mode it is still possible to use SQL transactions executing BEGIN and COMMIT statements manually as Psycopg Asynchronous Support docs .

Connection.commit() and Connection.rollback() methods are disabled and always raises psycopg2.ProgrammingError exception.

Extension type translations

JSON

aiopg has support for JSON data type enabled by default.

For pushing data to server please wrap json dict into psycopg2.extras.Json:

from psycopg2.extras import Json

data = {'a': 1, 'b': 'str'}
await cur.execute("INSERT INTO tbl (val) VALUES (%s)", [Json(data)])

On receiving data from json column psycopg2-binary autoconvers result into python dict object:

await cur.execute("SELECT val FROM tbl")
item = await cur.fetchone()
assert item == {'b': 'str', 'a': 1}

Server-side notifications

Psycopg allows asynchronous interaction with other database sessions using the facilities offered by PostgreSQL commands LISTEN and NOTIFY . Please refer to the PostgreSQL documentation for examples about how to use this form of communication.

Notifications are instances of the Notify object made available upon reception in the connection.notifies list. Notifications can be sent from Python code simply executing a NOTIFY command in an Cursor.execute() call.

Receiving part should establish listening on notification channel by LISTEN call and wait notification events from Connection.notifies queue.

Note

calling await connection.notifies.get() may raise a psycopg2 exception if the underlying connection gets disconnected while you’re waiting for notifications.

There is usage example:

import asyncio

import psycopg2

import aiopg

dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"


async def notify(conn):
    async with conn.cursor() as cur:
        for i in range(5):
            msg = f"message {i}"
            print("Send ->", msg)
            await cur.execute("NOTIFY channel, %s", (msg,))

        await cur.execute("NOTIFY channel, 'finish'")


async def listen(conn):
    async with conn.cursor() as cur:
        await cur.execute("LISTEN channel")
        while True:
            try:
                msg = await conn.notifies.get()
            except psycopg2.Error as ex:
                print("ERROR: ", ex)
                return
            if msg.payload == "finish":
                return
            else:
                print("Receive <-", msg.payload)


async def main():
    async with aiopg.connect(dsn) as listenConn:
        async with aiopg.create_pool(dsn) as notifyPool:
            async with notifyPool.acquire() as notifyConn:
                listener = listen(listenConn)
                notifier = notify(notifyConn)
                await asyncio.gather(listener, notifier)
    print("ALL DONE")


asyncio.run(main())