aiopg.sa
— support for SQLAlchemy functional SQL layer¶
Intro¶
While core API provides a core support for access to PostgreSQL database, I found manipulations with raw SQL strings too annoying.
Fortunately we can use excellent SQLAlchemy Core as SQL query builder.
Example
import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('val', sa.String(255)))
async def create_table(engine):
async with engine.acquire() as conn:
await conn.execute('DROP TABLE IF EXISTS tbl')
await conn.execute('''CREATE TABLE tbl (
id serial PRIMARY KEY,
val varchar(255))''')
async def go():
async with create_engine(user='aiopg',
database='aiopg',
host='127.0.0.1',
password='passwd') as engine:
await create_table(engine)
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val='abc'))
async for row in conn.execute(tbl.select()):
print(row.id, row.val)
loop = asyncio.get_event_loop()
loop.run_until_complete(go())
So you can execute SQL query built by
tbl.insert().values(val='abc')
or tbl.select()
expressions.
sqlalchemy has rich and very powerful set of SQL construction functions, please read tutorial for full list of available operations.
Also we provide SQL transactions support. Please take a look on
SAConnection.begin()
method and family.
Engine¶
- coroutine async-with aiopg.sa.create_engine(dsn=None, *, minsize=1, maxsize=10, dialect=dialect, timeout=60, **kwargs)[source]¶
Crate an
Engine
instance with embedded connection pool.The pool has minsize opened connections to PostgreSQL server.
- aiopg.sa.dialect¶
An instance of SQLAlchemy dialect set up for psycopg2-binary usage.
An
sqlalchemy.engine.interfaces.Dialect
instance.See also
sqlalchemy.dialects.postgresql.psycopg2
psycopg2 dialect.
- class aiopg.sa.Engine[source]¶
Connects a
aiopg.Pool
andsqlalchemy.engine.interfaces.Dialect
together to provide a source of database connectivity and behavior.An
Engine
object is instantiated publicly using thecreate_engine()
coroutine.- dialect¶
A
sqlalchemy.engine.interfaces.Dialect
for the engine, readonly property.
- name¶
A name of the dialect, readonly property.
- driver¶
A driver of the dialect, readonly property.
- dsn¶
DSN connection info, readonly property.
See also
psycopg2-binary connection.dsn attribute.
- minsize¶
A minimal size of the pool (read-only),
1
by default.
- maxsize¶
A maximal size of the pool (read-only),
10
by default.
- size¶
A current size of the pool (readonly). Includes used and free connections.
- freesize¶
A count of free connections in the pool (readonly).
- timeout¶
A read-only float representing default timeout for operations for connections from pool.
- close()[source]¶
Close engine.
Mark all engine connections to be closed on getting back to engine. Closed engine doesn’t allow to acquire new connections.
If you want to wait for actual closing of acquired connection please call
wait_closed()
afterclose()
.Warning
The method is not a coroutine.
- terminate()[source]¶
Terminate engine.
Close engine’s pool with instantly closing all acquired connections also.
wait_closed()
should be called afterterminate()
for waiting for actual finishing.Warning
The method is not a coroutine.
- coroutine wait_closed()[source]¶
A coroutine that waits for releasing and closing all acquired connections.
Should be called after
close()
for waiting for actual engine closing.
- coroutine async-with acquire()[source]¶
Get a connection from pool.
This method is a coroutine.
Returns a
SAConnection
instance. Result of this method could be used as async contex manager:async with engine.acquire() as conn: await conn.execute(tbl.insert().values(val='abc'))
Warning
nested
acquire()
might lead to deadlocks.
Connection¶
- class aiopg.sa.SAConnection[source]¶
A wrapper for
aiopg.Connection
instance.The class provides methods for executing SQL queries and working with SQL transactions.
- coroutine async-for execute(query, *multiparams, **params)[source]¶
Executes a SQL query with optional parameters.
- Parameters
query – a SQL query string or any sqlalchemy expression (see SQLAlchemy Core)
*multiparams/**params –
represent bound parameter values to be used in the execution. Typically, the format is either a dictionary passed to *multiparams:
await conn.execute( table.insert(), {"id":1, "value":"v1"} )
…or individual key/values interpreted by **params:
await conn.execute( table.insert(), id=1, value="v1" )
In the case that a plain SQL string is passed, a tuple or individual values in *multiparams may be passed:
await conn.execute( "INSERT INTO table (id, value) VALUES (%d, %s)", (1, "v1") ) await conn.execute( "INSERT INTO table (id, value) VALUES (%s, %s)", 1, "v1" )
Result value for
SELECT
statements may be iterated immediately:async for row in conn.execute(tbl.select()): print(row.id, row.name, row.surname)
- Returns
ResultProxy
instance with results of SQL query execution.
See also
Simple examples sqlalchemy style Simple sqlalchemy usage
Examples sqlalchemy default field Default value field sqlalchemy usage
Examples sqlalchemy type field Types field sqlalchemy usage
Examples sqlalchemy name field Named field sqlalchemy usage
- coroutine scalar(query, *multiparams, **params)[source]¶
Executes a SQL query and returns a scalar value.
See also
- closed¶
The readonly property that returns
True
if connections is closed.
- coroutine async-with begin()[source]¶
Begin a transaction and return a transaction handle.
This method is a coroutine.
The returned object is an instance of
Transaction
. This object represents the “scope” of the transaction, which completes when either theTransaction.rollback()
orTransaction.commit()
method is called.Nested calls to
begin()
on the sameSAConnection
will return newTransaction
objects that represent an emulated transaction within the scope of the enclosing transaction, that is:trans = await conn.begin() # outermost transaction trans2 = await conn.begin() # "inner" await trans2.commit() # does nothing await trans.commit() # actually commits
Calls to
Transaction.commit()
only have an effect when invoked via the outermostTransaction
object, though theTransaction.rollback()
method of any of theTransaction
objects will roll back the transaction.See also
Simple examples Simple transaction in sqlalchemy
Examples with isolation level Isolation transaction in sqlalchemy
SAConnection.begin_nested()
- use a SAVEPOINTSAConnection.begin_twophase()
- use a two phase (XA) transaction
- coroutine async-with begin_nested()[source]¶
Begin a nested transaction and return a transaction handle.
The returned object is an instance of
NestedTransaction
.Any transaction in the hierarchy may
commit
androllback
, however the outermost transaction still controls the overallcommit
orrollback
of the transaction of a whole. It utilizes SAVEPOINT facility of PostgreSQL server.See also
- coroutine async-with begin_twophase(xid=None)[source]¶
Begin a two-phase or XA transaction and return a transaction handle.
The returned object is an instance of
TwoPhaseTransaction
, which in addition to the methods provided byTransaction
, also provides aprepare()
method.- Parameters
xid – the two phase transaction id. If not supplied, a random id will be generated.
- in_transaction¶
The readonly property that returns
True
if a transaction is in progress.
- coroutine close()[source]¶
Close this
SAConnection
.This results in a release of the underlying database resources, that is, the
aiopg.Connection
referenced internally. Theaiopg.Connection
is typically restored back to the connection-holdingaiopg.Pool
referenced by theEngine
that produced thisSAConnection
. Any transactional state present on theaiopg.Connection
is also unconditionally released via callingTransaction.rollback()
method.After
close()
is called, theSAConnection
is permanently in a closed state, and will allow no further operations.
ResultProxy¶
- class aiopg.sa.ResultProxy¶
Wraps a DB-API like
Cursor
object to provide easier access to row columns.Individual columns may be accessed by their integer position, case-sensitive column name, or by
sqlalchemy.schema.Column`
object. e.g.:async for row in conn.execute(...): col1 = row[0] # access via integer position col2 = row['col2'] # access via name col3 = row[mytable.c.mycol] # access via Column object.
ResultProxy
also handles post-processing of result column data usingsqlalchemy.types.TypeEngine
objects, which are referenced from the originating SQL statement that produced this result set.- dialect¶
The readonly property that returns
sqlalchemy.engine.interfaces.Dialect
dialect for theResultProxy
instance.See also
dialect
global data.
- keys()¶
Return the current set of string keys for rows.
- rowcount¶
The readonly property that returns the ‘rowcount’ for this result.
The ‘rowcount’ reports the number of rows matched by the WHERE criterion of an UPDATE or DELETE statement.
Note
Notes regarding
ResultProxy.rowcount
:This attribute returns the number of rows matched, which is not necessarily the same as the number of rows that were actually modified - an UPDATE statement, for example, may have no net change on a given row if the SET values given are the same as those present in the row already. Such a row would be matched but not modified.
ResultProxy.rowcount
is only useful in conjunction with an UPDATE or DELETE statement. Contrary to what the Python DBAPI says, it does not return the number of rows available from the results of a SELECT statement as DBAPIs cannot support this functionality when rows are unbuffered.Statements that use RETURNING does not return a correct rowcount.
- returns_rows¶
A readonly property that returns
True
if thisResultProxy
returns rows.I.e. if it is legal to call the methods
ResultProxy.fetchone()
,ResultProxy.fetchmany()
,ResultProxy.fetchall()
.
- closed¶
Return
True
if thisResultProxy
is closed (no pending rows in underlying cursor).
- close()¶
Close this
ResultProxy
.Closes the underlying
aiopg.Cursor
corresponding to the execution.Note that any data cached within this
ResultProxy
is still available. For some types of results, this may include buffered rows.This method is called automatically when:
all result rows are exhausted using the fetchXXX() methods.
cursor.description is None.
- coroutine fetchall()¶
Fetch all rows, just like
aiopg.Cursor.fetchall()
.The connection is closed after the call.
Returns a list of
RowProxy
.
- coroutine fetchone()¶
Fetch one row, just like
aiopg.Cursor.fetchone()
.If a row is present, the cursor remains open after this is called.
Else the cursor is automatically closed and
None
is returned.Returns an
RowProxy
instance orNone
.
- coroutine fetchmany(size=None)¶
Fetch many rows, just like
aiopg.Cursor.fetchmany()
.If rows are present, the cursor remains open after this is called.
Else the cursor is automatically closed and an empty list is returned.
Returns a list of
RowProxy
.
- class aiopg.sa.RowProxy¶
A
collections.abc.Mapping
for representing a row in query result.Keys are column names, values are result values.
Individual columns may be accessed by their integer position, case-sensitive column name, or by
sqlalchemy.schema.Column`
object.Has overloaded operators
__eq__
and__ne__
for comparing two rows.The
RowProxy
is not hashable...method:: as_tuple()
Return a tuple with values from
RowProxy.values()
.
Transaction objects¶
- class aiopg.sa.Transaction¶
Represent a database transaction in progress.
The
Transaction
object is procured by calling theSAConnection.begin()
method ofSAConnection
:async with engine.acquire() as conn: async with conn.begin() as tr: await conn.execute("insert into x (a, b) values (1, 2)")
The object provides
rollback()
andcommit()
methods in order to control transaction boundaries. Contex manager will invokerollback()
in case of exception in contex managers code block andcommit()
- in case of success.- is_active¶
A readonly property that returns
True
if a transaction is active.
- connection¶
A readonly property that returns
SAConnection
for transaction.
- coroutine close()¶
Close this
Transaction
.If this transaction is the base transaction in a begin/commit nesting, the transaction will
Transaction.rollback()
. Otherwise, the method returns.This is used to cancel a
Transaction
without affecting the scope of an enclosing transaction.
- coroutine rollback()¶
Roll back this
Transaction
.
- coroutine commit()¶
Commit this
Transaction
.
- class aiopg.sa.NestedTransaction¶
Represent a ‘nested’, or SAVEPOINT transaction.
A new
NestedTransaction
object may be procured using theSAConnection.begin_nested()
method.The interface is the same as that of
Transaction
.See also
PostgreSQL commands for nested transactions:
- class aiopg.sa.TwoPhaseTransaction¶
Represent a two-phase transaction.
A new
TwoPhaseTransaction
object may be procured using theSAConnection.begin_twophase()
method.The interface is the same as that of
Transaction
with the addition of theTwoPhaseTransaction.prepare()
method.- xid¶
A readonly property that returns twophase transaction id.
- coroutine prepare()¶
Prepare this
TwoPhaseTransaction
.After a PREPARE, the transaction can be committed.
See also
PostgreSQL commands for two phase transactions: