aiopg.sa — support for SQLAlchemy functional SQL layer

Intro

While core API provides a core support for access to PostgreSQL database, I found manipulations with raw SQL strings too annoying.

Fortunately we can use excellent SQLAlchemy Core as SQL query builder.

Example

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa


metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))


async def create_table(engine):
    async with engine.acquire() as conn:
        await conn.execute('DROP TABLE IF EXISTS tbl')
        await conn.execute('''CREATE TABLE tbl (
                                  id serial PRIMARY KEY,
                                  val varchar(255))''')


async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:

        await create_table(engine)
        async with engine.acquire() as conn:
            await conn.execute(tbl.insert().values(val='abc'))

            async for row in conn.execute(tbl.select()):
                print(row.id, row.val)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

So you can execute SQL query built by tbl.insert().values(val='abc') or tbl.select() expressions.

sqlalchemy has rich and very powerful set of SQL construction functions, please read tutorial for full list of available operations.

Also we provide SQL transactions support. Please take a look on SAConnection.begin() method and family.

Engine

coroutine async-with aiopg.sa.create_engine(dsn=None, *, minsize=1, maxsize=10, dialect=dialect, timeout=60, **kwargs)[source]

Crate an Engine instance with embedded connection pool.

The pool has minsize opened connections to PostgreSQL server.

aiopg.sa.dialect

An instance of SQLAlchemy dialect set up for psycopg2-binary usage.

An sqlalchemy.engine.interfaces.Dialect instance.

See also

sqlalchemy.dialects.postgresql.psycopg2 psycopg2 dialect.

class aiopg.sa.Engine[source]

Connects a aiopg.Pool and sqlalchemy.engine.interfaces.Dialect together to provide a source of database connectivity and behavior.

An Engine object is instantiated publicly using the create_engine() coroutine.

dialect

A sqlalchemy.engine.interfaces.Dialect for the engine, readonly property.

name

A name of the dialect, readonly property.

driver

A driver of the dialect, readonly property.

dsn

DSN connection info, readonly property.

See also

psycopg2-binary connection.dsn attribute.

minsize

A minimal size of the pool (read-only), 1 by default.

maxsize

A maximal size of the pool (read-only), 10 by default.

size

A current size of the pool (readonly). Includes used and free connections.

freesize

A count of free connections in the pool (readonly).

timeout

A read-only float representing default timeout for operations for connections from pool.

close()[source]

Close engine.

Mark all engine connections to be closed on getting back to engine. Closed engine doesn’t allow to acquire new connections.

If you want to wait for actual closing of acquired connection please call wait_closed() after close().

Warning

The method is not a coroutine.

terminate()[source]

Terminate engine.

Close engine’s pool with instantly closing all acquired connections also.

wait_closed() should be called after terminate() for waiting for actual finishing.

Warning

The method is not a coroutine.

coroutine wait_closed()[source]

A coroutine that waits for releasing and closing all acquired connections.

Should be called after close() for waiting for actual engine closing.

coroutine async-with acquire()[source]

Get a connection from pool.

This method is a coroutine.

Returns a SAConnection instance. Result of this method could be used as async contex manager:

async with engine.acquire() as conn:
    await conn.execute(tbl.insert().values(val='abc'))

Warning

nested acquire() might lead to deadlocks.

release()[source]

Revert back connection conn to pool.

Warning

The method is not a coroutine.

Connection

class aiopg.sa.SAConnection[source]

A wrapper for aiopg.Connection instance.

The class provides methods for executing SQL queries and working with SQL transactions.

coroutine async-for execute(query, *multiparams, **params)[source]

Executes a SQL query with optional parameters.

Parameters
  • query – a SQL query string or any sqlalchemy expression (see SQLAlchemy Core)

  • *multiparams/**params

    represent bound parameter values to be used in the execution. Typically, the format is either a dictionary passed to *multiparams:

    await conn.execute(
        table.insert(),
        {"id":1, "value":"v1"}
    )
    

    …or individual key/values interpreted by **params:

    await conn.execute(
        table.insert(), id=1, value="v1"
    )
    

    In the case that a plain SQL string is passed, a tuple or individual values in *multiparams may be passed:

    await conn.execute(
        "INSERT INTO table (id, value) VALUES (%d, %s)",
        (1, "v1")
    )
    
    await conn.execute(
        "INSERT INTO table (id, value) VALUES (%s, %s)",
        1, "v1"
    )
    

Result value for SELECT statements may be iterated immediately:

async for row in conn.execute(tbl.select()):
    print(row.id, row.name, row.surname)
Returns

ResultProxy instance with results of SQL query execution.

See also

coroutine scalar(query, *multiparams, **params)[source]

Executes a SQL query and returns a scalar value.

closed

The readonly property that returns True if connections is closed.

coroutine async-with begin()[source]

Begin a transaction and return a transaction handle.

This method is a coroutine.

The returned object is an instance of Transaction. This object represents the “scope” of the transaction, which completes when either the Transaction.rollback() or Transaction.commit() method is called.

Nested calls to begin() on the same SAConnection will return new Transaction objects that represent an emulated transaction within the scope of the enclosing transaction, that is:

trans = await conn.begin()   # outermost transaction
trans2 = await conn.begin()  # "inner"
await trans2.commit()        # does nothing
await trans.commit()         # actually commits

Calls to Transaction.commit() only have an effect when invoked via the outermost Transaction object, though the Transaction.rollback() method of any of the Transaction objects will roll back the transaction.

See also

coroutine async-with begin_nested()[source]

Begin a nested transaction and return a transaction handle.

The returned object is an instance of NestedTransaction.

Any transaction in the hierarchy may commit and rollback, however the outermost transaction still controls the overall commit or rollback of the transaction of a whole. It utilizes SAVEPOINT facility of PostgreSQL server.

coroutine async-with begin_twophase(xid=None)[source]

Begin a two-phase or XA transaction and return a transaction handle.

The returned object is an instance of TwoPhaseTransaction, which in addition to the methods provided by Transaction, also provides a prepare() method.

Parameters

xid – the two phase transaction id. If not supplied, a random id will be generated.

coroutine recover_twophase()[source]

Return a list of prepared twophase transaction ids.

coroutine rollback_prepared(xid)[source]

Rollback prepared twophase transaction xid.

coroutine commit_prepared(xid)[source]

Commit prepared twophase transaction xid.

in_transaction

The readonly property that returns True if a transaction is in progress.

coroutine close()[source]

Close this SAConnection.

This results in a release of the underlying database resources, that is, the aiopg.Connection referenced internally. The aiopg.Connection is typically restored back to the connection-holding aiopg.Pool referenced by the Engine that produced this SAConnection. Any transactional state present on the aiopg.Connection is also unconditionally released via calling Transaction.rollback() method.

After close() is called, the SAConnection is permanently in a closed state, and will allow no further operations.

ResultProxy

class aiopg.sa.ResultProxy

Wraps a DB-API like Cursor object to provide easier access to row columns.

Individual columns may be accessed by their integer position, case-sensitive column name, or by sqlalchemy.schema.Column` object. e.g.:

async for row in conn.execute(...):
    col1 = row[0]    # access via integer position
    col2 = row['col2']   # access via name
    col3 = row[mytable.c.mycol] # access via Column object.

ResultProxy also handles post-processing of result column data using sqlalchemy.types.TypeEngine objects, which are referenced from the originating SQL statement that produced this result set.

dialect

The readonly property that returns sqlalchemy.engine.interfaces.Dialect dialect for the ResultProxy instance.

See also

dialect global data.

keys()

Return the current set of string keys for rows.

rowcount

The readonly property that returns the ‘rowcount’ for this result.

The ‘rowcount’ reports the number of rows matched by the WHERE criterion of an UPDATE or DELETE statement.

Note

Notes regarding ResultProxy.rowcount:

  • This attribute returns the number of rows matched, which is not necessarily the same as the number of rows that were actually modified - an UPDATE statement, for example, may have no net change on a given row if the SET values given are the same as those present in the row already. Such a row would be matched but not modified.

  • ResultProxy.rowcount is only useful in conjunction with an UPDATE or DELETE statement. Contrary to what the Python DBAPI says, it does not return the number of rows available from the results of a SELECT statement as DBAPIs cannot support this functionality when rows are unbuffered.

  • Statements that use RETURNING does not return a correct rowcount.

returns_rows

A readonly property that returns True if this ResultProxy returns rows.

I.e. if it is legal to call the methods ResultProxy.fetchone(), ResultProxy.fetchmany(), ResultProxy.fetchall().

closed

Return True if this ResultProxy is closed (no pending rows in underlying cursor).

close()

Close this ResultProxy.

Closes the underlying aiopg.Cursor corresponding to the execution.

Note that any data cached within this ResultProxy is still available. For some types of results, this may include buffered rows.

This method is called automatically when:

  • all result rows are exhausted using the fetchXXX() methods.

  • cursor.description is None.

coroutine fetchall()

Fetch all rows, just like aiopg.Cursor.fetchall().

The connection is closed after the call.

Returns a list of RowProxy.

coroutine fetchone()

Fetch one row, just like aiopg.Cursor.fetchone().

If a row is present, the cursor remains open after this is called.

Else the cursor is automatically closed and None is returned.

Returns an RowProxy instance or None.

coroutine fetchmany(size=None)

Fetch many rows, just like aiopg.Cursor.fetchmany().

If rows are present, the cursor remains open after this is called.

Else the cursor is automatically closed and an empty list is returned.

Returns a list of RowProxy.

coroutine first()

Fetch the first row and then close the result set unconditionally.

Returns None if no row is present or an RowProxy instance.

coroutine scalar()

Fetch the first column of the first row, and close the result set.

Returns None if no row is present or an RowProxy instance.

class aiopg.sa.RowProxy

A collections.abc.Mapping for representing a row in query result.

Keys are column names, values are result values.

Individual columns may be accessed by their integer position, case-sensitive column name, or by sqlalchemy.schema.Column` object.

Has overloaded operators __eq__ and __ne__ for comparing two rows.

The RowProxy is not hashable.

..method:: as_tuple()

Return a tuple with values from RowProxy.values().

Transaction objects

class aiopg.sa.Transaction

Represent a database transaction in progress.

The Transaction object is procured by calling the SAConnection.begin() method of SAConnection:

async with engine.acquire() as conn:
    async with conn.begin() as tr:
        await conn.execute("insert into x (a, b) values (1, 2)")

The object provides rollback() and commit() methods in order to control transaction boundaries. Contex manager will invoke rollback() in case of exception in contex managers code block and commit() - in case of success.

is_active

A readonly property that returns True if a transaction is active.

connection

A readonly property that returns SAConnection for transaction.

coroutine close()

Close this Transaction.

If this transaction is the base transaction in a begin/commit nesting, the transaction will Transaction.rollback(). Otherwise, the method returns.

This is used to cancel a Transaction without affecting the scope of an enclosing transaction.

coroutine rollback()

Roll back this Transaction.

coroutine commit()

Commit this Transaction.

class aiopg.sa.NestedTransaction

Represent a ‘nested’, or SAVEPOINT transaction.

A new NestedTransaction object may be procured using the SAConnection.begin_nested() method.

The interface is the same as that of Transaction.

See also

PostgreSQL commands for nested transactions:

class aiopg.sa.TwoPhaseTransaction

Represent a two-phase transaction.

A new TwoPhaseTransaction object may be procured using the SAConnection.begin_twophase() method.

The interface is the same as that of Transaction with the addition of the TwoPhaseTransaction.prepare() method.

xid

A readonly property that returns twophase transaction id.

coroutine prepare()

Prepare this TwoPhaseTransaction.

After a PREPARE, the transaction can be committed.

See also

PostgreSQL commands for two phase transactions: