Examples of aiopg usage

Below is a list of examples from aiopg/examples

Every example is a correct tiny python program.

Low-level API

import asyncio

import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


async def test_select():
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                ret = []
                async for row in cur:
                    ret.append(row)
                assert ret == [(1,)]
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(test_select())

Usage of LISTEN/NOTIFY commands

import asyncio

import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


async def notify(conn):
    async with conn.cursor() as cur:
        for i in range(5):
            msg = f"message {i}"
            print('Send ->', msg)
            await cur.execute("NOTIFY channel, %s", (msg,))

        await cur.execute("NOTIFY channel, 'finish'")


async def listen(conn):
    async with conn.cursor() as cur:
        await cur.execute("LISTEN channel")
        while True:
            msg = await conn.notifies.get()
            if msg.payload == 'finish':
                return
            else:
                print('Receive <-', msg.payload)


async def main():
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn1:
            listener = listen(conn1)
            async with pool.acquire() as conn2:
                notifier = notify(conn2)
                await asyncio.gather(listener, notifier)
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Simple sqlalchemy usage

import asyncio

import sqlalchemy as sa

from aiopg.sa import create_engine

metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))


async def create_table(conn):
    await conn.execute('DROP TABLE IF EXISTS tbl')
    await conn.execute('''CREATE TABLE tbl (
    id serial PRIMARY KEY,
    val varchar(255))''')


async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await conn.execute(tbl.insert().values(val='abc'))

            async for row in conn.execute(tbl.select()):
                print(row.id, row.val)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Default value field sqlalchemy usage

import asyncio
import datetime
import uuid

import sqlalchemy as sa
from sqlalchemy.sql.ddl import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()

now = datetime.datetime.now

tbl = sa.Table(
    'tbl', metadata,
    sa.Column('id', sa.Integer, autoincrement=True, primary_key=True),
    sa.Column('uuid', sa.String, default=lambda: str(uuid.uuid4())),
    sa.Column('name', sa.String(255), default='default name'),
    sa.Column('date', sa.DateTime, default=datetime.datetime.now),
    sa.Column('flag', sa.Integer, default=0),
    sa.Column('count_str', sa.Integer, default=sa.func.length('default')),
    sa.Column('is_active', sa.Boolean, default=True),
)


async def insert_tbl(conn, pk, **kwargs):
    await conn.execute(tbl.insert().values(**kwargs))
    row = await (await conn.execute(tbl.select())).first()

    assert row.id == pk

    for name, val in kwargs.items():
        assert row[name] == val

    await conn.execute(sa.delete(tbl))


async def create_table(conn):
    await conn.execute('DROP TABLE IF EXISTS tbl')
    await conn.execute(CreateTable(tbl))


async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await insert_tbl(conn, 1)
            await insert_tbl(conn, 2, name='test', is_active=False, date=now())


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Types field sqlalchemy usage

import asyncio

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ARRAY, ENUM, JSON
from sqlalchemy.sql.ddl import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()


class CustomStrList(sa.types.TypeDecorator):
    impl = sa.types.String

    def __init__(self, sep=',', *args, **kwargs):
        self._sep = sep
        self._args = args
        self._kwargs = kwargs
        super().__init__(*args, **kwargs)

    def process_bind_param(self, value, dialect):
        return f'{self._sep}'.join(map(str, value))

    def process_result_value(self, value, dialect):
        if value is None:
            return value

        return value.split(self._sep)

    def copy(self):
        return CustomStrList(self._sep, *self._args, **self._kwargs)


tbl = sa.Table(
    'tbl', metadata,
    sa.Column('id', sa.Integer, autoincrement=True, primary_key=True),
    sa.Column('json', JSON, default=None),
    sa.Column('array_int', ARRAY(sa.Integer), default=list),
    sa.Column('enum', ENUM('f', 's', name='s_enum'), default='s'),
    sa.Column('custom_list', CustomStrList(), default=list),
)


async def insert_tbl(conn, pk, **kwargs):
    await conn.execute(tbl.insert().values(**kwargs))
    row = await (await conn.execute(tbl.select())).first()

    assert row.id == pk

    for name, val in kwargs.items():
        assert row[name] == val

    await conn.execute(sa.delete(tbl))


async def create_table(conn):
    await conn.execute('DROP TABLE IF EXISTS tbl')
    await conn.execute('DROP TYPE IF EXISTS s_enum CASCADE')
    await conn.execute("CREATE TYPE s_enum AS ENUM ('f', 's')")
    await conn.execute(CreateTable(tbl))


async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await insert_tbl(conn, 1)
            await insert_tbl(conn, 2, json={'data': 123})
            await insert_tbl(conn, 3, array_int=[1, 3, 4])
            await insert_tbl(conn, 4, enum='f')
            await insert_tbl(conn, 5, custom_list=['1', 'test', '4'])


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Named field sqlalchemy usage

import asyncio
import datetime

import sqlalchemy as sa

from aiopg.sa import create_engine

metadata = sa.MetaData()

now = datetime.datetime.now

tbl = sa.Table(
    'tbl', metadata,
    sa.Column('MyIDField', sa.Integer, key='id', primary_key=True),
    sa.Column('NaMe', sa.String(255), key='name', default='default name'),
)


async def insert_tbl(conn, **kwargs):
    await conn.execute(tbl.insert().values(**kwargs))
    row = await (await conn.execute(tbl.select())).first()

    for name, val in kwargs.items():
        assert row[name] == val

    await conn.execute(sa.delete(tbl))


async def create_table(conn):
    await conn.execute('DROP TABLE IF EXISTS tbl')
    await conn.execute(
        'CREATE TABLE tbl ('
        '"MyIDField" INTEGER NOT NULL, '
        '"NaMe" VARCHAR(255), '
        'PRIMARY KEY ("MyIDField"))'
    )


async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
            await insert_tbl(conn, id=1)
            await insert_tbl(conn, id=2, name='test')


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Complex sqlalchemy queries

import asyncio
import datetime
import random

import sqlalchemy as sa

from aiopg.sa import create_engine

metadata = sa.MetaData()

users = sa.Table('users', metadata,
                 sa.Column('id', sa.Integer, primary_key=True),
                 sa.Column('name', sa.String(255)),
                 sa.Column('birthday', sa.DateTime))

emails = sa.Table('emails', metadata,
                  sa.Column('id', sa.Integer, primary_key=True),
                  sa.Column('user_id', None, sa.ForeignKey('users.id')),
                  sa.Column('email', sa.String(255), nullable=False),
                  sa.Column('private', sa.Boolean, nullable=False))


async def create_tables(conn):
    await conn.execute('DROP TABLE IF EXISTS emails')
    await conn.execute('DROP TABLE IF EXISTS users')
    await conn.execute('''CREATE TABLE users (
                                        id serial PRIMARY KEY,
                                        name varchar(255),
                                        birthday timestamp)''')
    await conn.execute('''CREATE TABLE emails (
                                id serial,
                                user_id int references users(id),
                                email varchar(253),
                                private bool)''')


names = {'Andrew', 'Bob', 'John', 'Vitaly', 'Alex', 'Lina', 'Olga',
         'Doug', 'Julia', 'Matt', 'Jessica', 'Nick', 'Dave', 'Martin',
         'Abbi', 'Eva', 'Lori', 'Rita', 'Rosa', 'Ivy', 'Clare', 'Maria',
         'Jenni', 'Margo', 'Anna'}


def gen_birthday():
    now = datetime.datetime.now()
    year = random.randint(now.year - 30, now.year - 20)
    month = random.randint(1, 12)
    day = random.randint(1, 28)
    return datetime.datetime(year, month, day)


async def fill_data(conn):
    async with conn.begin():
        for name in random.sample(names, len(names)):
            uid = await conn.scalar(
                users.insert().values(name=name, birthday=gen_birthday()))
            emails_count = int(random.paretovariate(2))
            for num in random.sample(range(10000), emails_count):
                is_private = random.uniform(0, 1) < 0.8
                await conn.execute(emails.insert().values(
                    user_id=uid,
                    email=f'{name}+{num}@gmail.com',
                    private=is_private))


async def count(conn):
    c1 = (await conn.scalar(users.count()))
    c2 = (await conn.scalar(emails.count()))
    print("Population consists of", c1, "people with",
          c2, "emails in total")
    join = sa.join(emails, users, users.c.id == emails.c.user_id)
    query = (sa.select([users.c.name])
             .select_from(join)
             .where(emails.c.private == False)  # noqa
             .group_by(users.c.name)
             .having(sa.func.count(emails.c.private) > 0))

    print("Users with public emails:")
    async for row in conn.execute(query):
        print(row.name)

    print()


async def show_julia(conn):
    print("Lookup for Julia:")
    join = sa.join(emails, users, users.c.id == emails.c.user_id)
    query = (sa.select([users, emails], use_labels=True)
             .select_from(join).where(users.c.name == 'Julia'))
    async for row in conn.execute(query):
        print(row.users_name, row.users_birthday,
              row.emails_email, row.emails_private)
    print()


async def ave_age(conn):
    query = (sa.select([sa.func.avg(sa.func.age(users.c.birthday))])
             .select_from(users))
    ave = (await conn.scalar(query))
    print("Average age of population is", ave,
          "or ~", int(ave.days / 365), "years")
    print()


async def go():
    engine = await create_engine(user='aiopg',
                                 database='aiopg',
                                 host='127.0.0.1',
                                 password='passwd')
    async with engine:
        async with engine.acquire() as conn:
            await create_tables(conn)
            await fill_data(conn)
            await count(conn)
            await show_julia(conn)
            await ave_age(conn)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Simple transaction in sqlalchemy

import asyncio

import sqlalchemy as sa
from sqlalchemy.schema import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()

users = sa.Table(
    'users_sa_transaction', metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.String(255))
)


async def create_sa_transaction_tables(conn):
    await conn.execute(CreateTable(users))


async def check_count_users(conn, *, count):
    s_query = sa.select(users).select_from(users)
    assert count == len(list(await (await conn.execute(s_query)).fetchall()))


async def success_transaction(conn):
    await check_count_users(conn, count=0)

    async with conn.begin():
        await conn.execute(sa.insert(users).values(id=1, name='test1'))
        await conn.execute(sa.insert(users).values(id=2, name='test2'))

    await check_count_users(conn, count=2)

    async with conn.begin():
        await conn.execute(sa.delete(users).where(users.c.id == 1))
        await conn.execute(sa.delete(users).where(users.c.id == 2))

    await check_count_users(conn, count=0)


async def fail_transaction(conn):
    await check_count_users(conn, count=0)

    trans = await conn.begin()

    try:
        await conn.execute(sa.insert(users).values(id=1, name='test1'))
        raise RuntimeError()

    except RuntimeError:
        await trans.rollback()
    else:
        await trans.commit()

    await check_count_users(conn, count=0)


async def success_nested_transaction(conn):
    await check_count_users(conn, count=0)

    async with conn.begin_nested():
        await conn.execute(sa.insert(users).values(id=1, name='test1'))

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=2, name='test2'))

    await check_count_users(conn, count=2)

    async with conn.begin():
        await conn.execute(sa.delete(users).where(users.c.id == 1))
        await conn.execute(sa.delete(users).where(users.c.id == 2))

    await check_count_users(conn, count=0)


async def fail_nested_transaction(conn):
    await check_count_users(conn, count=0)

    async with conn.begin_nested():
        await conn.execute(sa.insert(users).values(id=1, name='test1'))

        tr_f = await conn.begin_nested()
        try:
            await conn.execute(sa.insert(users).values(id=2, name='test2'))
            raise RuntimeError()

        except RuntimeError:
            await tr_f.rollback()
        else:
            await tr_f.commit()

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=2, name='test2'))

    await check_count_users(conn, count=2)

    async with conn.begin():
        await conn.execute(sa.delete(users).where(users.c.id == 1))
        await conn.execute(sa.delete(users).where(users.c.id == 2))

    await check_count_users(conn, count=0)


async def fail_first_nested_transaction(conn):
    trans = await conn.begin_nested()

    try:
        await conn.execute(sa.insert(users).values(id=1, name='test1'))

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=2, name='test2'))

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=3, name='test3'))

        raise RuntimeError()

    except RuntimeError:
        await trans.rollback()
    else:
        await trans.commit()

    await check_count_users(conn, count=0)


async def go():
    engine = await create_engine(user='aiopg',
                                 database='aiopg',
                                 host='127.0.0.1',
                                 password='passwd')
    async with engine:
        async with engine.acquire() as conn:
            await create_sa_transaction_tables(conn)

            await success_transaction(conn)
            await fail_transaction(conn)

            await success_nested_transaction(conn)
            await fail_nested_transaction(conn)
            await fail_first_nested_transaction(conn)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Isolation transaction in sqlalchemy

import asyncio

import sqlalchemy as sa
from psycopg2 import InternalError
from psycopg2.extensions import TransactionRollbackError
from sqlalchemy.sql.ddl import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()

users = sa.Table(
    'users_sa_isolation_transaction', metadata,
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.String(255))
)


async def create_sa_transaction_tables(conn):
    await conn.execute(CreateTable(users))


async def repea_sa_transaction(conn, conn2):
    isolation_level = 'REPEATABLE READ'
    await conn.execute(sa.insert(users).values(id=1, name='test1'))
    t1 = await conn.begin(isolation_level=isolation_level)

    where = users.c.id == 1
    q_user = users.select().where(where)
    user = await (await conn.execute(q_user)).fetchone()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.update(users).values({'name': 'name2'}).where(where))

    t2 = await conn2.begin(isolation_level=isolation_level)
    assert await (await conn2.execute(q_user)).fetchone() == user

    await t1.commit()

    await conn2.execute(users.insert().values({'id': 2, 'name': 'test'}))

    try:
        await conn2.execute(
            sa.update(users).values({'name': 't'}).where(where))
    except TransactionRollbackError as e:
        assert e.pgcode == '40001'

    await t2.commit()

    assert len(await (await conn2.execute(q_user)).fetchall()) == 1
    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def serializable_sa_transaction(conn, conn2):
    isolation_level = 'SERIALIZABLE'
    await conn.execute(sa.insert(users).values(id=1, name='test1'))
    t1 = await conn.begin(isolation_level=isolation_level)

    where = users.c.id == 1
    q_user = users.select().where(where)
    user = await (await conn.execute(q_user)).fetchone()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.update(users).values({'name': 'name2'}).where(where))

    t2 = await conn2.begin(isolation_level=isolation_level)
    assert await (await conn2.execute(q_user)).fetchone() == user

    await t1.commit()

    try:
        await conn2.execute(users.insert().values({'id': 2, 'name': 'test'}))
    except TransactionRollbackError as e:
        assert e.pgcode == '40001'

    try:
        await conn2.execute(users.update().values({'name': 't'}).where(where))
    except InternalError as e:
        assert e.pgcode == '25P02'

    await t2.commit()

    user = dict(await (await conn2.execute(q_user)).fetchone())
    assert user == {'name': 'name2', 'id': 1}

    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def read_only_read_sa_transaction(conn, deferrable):
    await conn.execute(sa.insert(users).values(id=1, name='test1'))
    t1 = await conn.begin(
        isolation_level='SERIALIZABLE',
        readonly=True,
        deferrable=deferrable
    )

    where = users.c.id == 1

    try:
        await conn.execute(sa.update(users).values({'name': 't'}).where(where))
    except InternalError as e:
        assert e.pgcode == '25006'

    await t1.commit()

    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def isolation_read_sa_transaction(conn, conn2):
    await conn.execute(sa.insert(users).values(id=1, name='test1'))
    t1 = await conn.begin()

    where = users.c.id == 1
    q_user = users.select().where(where)
    user = await (await conn.execute(q_user)).fetchone()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.update(users).values({'name': 'name2'}).where(where))

    t2 = await conn2.begin()
    assert await (await conn2.execute(q_user)).fetchone() == user

    await t1.commit()

    await conn2.execute(sa.update(users).values(user).where(where))
    await t2.commit()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def go():
    engine = await create_engine(user='aiopg',
                                 database='aiopg',
                                 host='127.0.0.1',
                                 password='passwd')
    async with engine:
        async with engine.acquire() as conn:
            await create_sa_transaction_tables(conn)

        async with engine.acquire() as conn:
            await read_only_read_sa_transaction(conn, True)
            await read_only_read_sa_transaction(conn, False)

            async with engine.acquire() as conn2:
                await repea_sa_transaction(conn, conn2)
                await serializable_sa_transaction(conn, conn2)
                await isolation_read_sa_transaction(conn, conn2)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())