Examples of aiopg usage

Below is a list of examples from aiopg/examples

Every example is a correct tiny python program.

Low-level API

import asyncio

import aiopg

dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"


async def test_select():
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                ret = []
                async for row in cur:
                    ret.append(row)
                assert ret == [(1,)]
    print("ALL DONE")


asyncio.run(test_select())

Usage of LISTEN/NOTIFY commands

import asyncio

import psycopg2

import aiopg

dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"


async def notify(conn):
    async with conn.cursor() as cur:
        for i in range(5):
            msg = f"message {i}"
            print("Send ->", msg)
            await cur.execute("NOTIFY channel, %s", (msg,))

        await cur.execute("NOTIFY channel, 'finish'")


async def listen(conn):
    async with conn.cursor() as cur:
        await cur.execute("LISTEN channel")
        while True:
            try:
                msg = await conn.notifies.get()
            except psycopg2.Error as ex:
                print("ERROR: ", ex)
                return
            if msg.payload == "finish":
                return
            else:
                print("Receive <-", msg.payload)


async def main():
    async with aiopg.connect(dsn) as listenConn:
        async with aiopg.create_pool(dsn) as notifyPool:
            async with notifyPool.acquire() as notifyConn:
                listener = listen(listenConn)
                notifier = notify(notifyConn)
                await asyncio.gather(listener, notifier)
    print("ALL DONE")


asyncio.run(main())

Simple sqlalchemy usage

import asyncio

import sqlalchemy as sa

from aiopg.sa import create_engine

metadata = sa.MetaData()

tbl = sa.Table(
    "tbl",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("val", sa.String(255)),
)


async def create_table(conn):
    await conn.execute("DROP TABLE IF EXISTS tbl")
    await conn.execute(
        """CREATE TABLE tbl (
    id serial PRIMARY KEY,
    val varchar(255))"""
    )


async def go():
    async with create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    ) as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await conn.execute(tbl.insert().values(val="abc"))

            async for row in conn.execute(tbl.select()):
                print(row.id, row.val)


asyncio.run(go())

Default value field sqlalchemy usage

import asyncio
import datetime
import uuid

import sqlalchemy as sa
from sqlalchemy.sql.ddl import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()

now = datetime.datetime.now

tbl = sa.Table(
    "tbl",
    metadata,
    sa.Column("id", sa.Integer, autoincrement=True, primary_key=True),
    sa.Column("uuid", sa.String, default=lambda: str(uuid.uuid4())),
    sa.Column("name", sa.String(255), default="default name"),
    sa.Column("date", sa.DateTime, default=datetime.datetime.now),
    sa.Column("flag", sa.Integer, default=0),
    sa.Column("count_str", sa.Integer, default=sa.func.length("default")),
    sa.Column("is_active", sa.Boolean, default=True),
)


async def insert_tbl(conn, pk, **kwargs):
    await conn.execute(tbl.insert().values(**kwargs))
    row = await (await conn.execute(tbl.select())).first()

    assert row.id == pk

    for name, val in kwargs.items():
        assert row[name] == val

    await conn.execute(sa.delete(tbl))


async def create_table(conn):
    await conn.execute("DROP TABLE IF EXISTS tbl")
    await conn.execute(CreateTable(tbl))


async def go():
    async with create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    ) as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await insert_tbl(conn, 1)
            await insert_tbl(conn, 2, name="test", is_active=False, date=now())


asyncio.run(go())

Types field sqlalchemy usage

import asyncio

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ARRAY, ENUM, JSON
from sqlalchemy.sql.ddl import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()


class CustomStrList(sa.types.TypeDecorator):
    impl = sa.types.String

    def __init__(self, sep=",", *args, **kwargs):
        self._sep = sep
        self._args = args
        self._kwargs = kwargs
        super().__init__(*args, **kwargs)

    def process_bind_param(self, value, dialect):
        return f"{self._sep}".join(map(str, value))

    def process_result_value(self, value, dialect):
        if value is None:
            return value

        return value.split(self._sep)

    def copy(self):
        return CustomStrList(self._sep, *self._args, **self._kwargs)


tbl = sa.Table(
    "tbl",
    metadata,
    sa.Column("id", sa.Integer, autoincrement=True, primary_key=True),
    sa.Column("json", JSON, default=None),
    sa.Column("array_int", ARRAY(sa.Integer), default=list),
    sa.Column("enum", ENUM("f", "s", name="s_enum"), default="s"),
    sa.Column("custom_list", CustomStrList(), default=list),
)


async def insert_tbl(conn, pk, **kwargs):
    await conn.execute(tbl.insert().values(**kwargs))
    row = await (await conn.execute(tbl.select())).first()

    assert row.id == pk

    for name, val in kwargs.items():
        assert row[name] == val

    await conn.execute(sa.delete(tbl))


async def create_table(conn):
    await conn.execute("DROP TABLE IF EXISTS tbl")
    await conn.execute("DROP TYPE IF EXISTS s_enum CASCADE")
    await conn.execute("CREATE TYPE s_enum AS ENUM ('f', 's')")
    await conn.execute(CreateTable(tbl))


async def go():
    async with create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    ) as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await insert_tbl(conn, 1)
            await insert_tbl(conn, 2, json={"data": 123})
            await insert_tbl(conn, 3, array_int=[1, 3, 4])
            await insert_tbl(conn, 4, enum="f")
            await insert_tbl(conn, 5, custom_list=["1", "test", "4"])


asyncio.run(go())

Named field sqlalchemy usage

import asyncio
import datetime

import sqlalchemy as sa

from aiopg.sa import create_engine

metadata = sa.MetaData()

now = datetime.datetime.now

tbl = sa.Table(
    "tbl",
    metadata,
    sa.Column("MyIDField", sa.Integer, key="id", primary_key=True),
    sa.Column("NaMe", sa.String(255), key="name", default="default name"),
)


async def insert_tbl(conn, **kwargs):
    await conn.execute(tbl.insert().values(**kwargs))
    row = await (await conn.execute(tbl.select())).first()

    for name, val in kwargs.items():
        assert row[name] == val

    await conn.execute(sa.delete(tbl))


async def create_table(conn):
    await conn.execute("DROP TABLE IF EXISTS tbl")
    await conn.execute(
        "CREATE TABLE tbl ("
        '"MyIDField" INTEGER NOT NULL, '
        '"NaMe" VARCHAR(255), '
        'PRIMARY KEY ("MyIDField"))'
    )


async def go():
    async with create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    ) as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
            await insert_tbl(conn, id=1)
            await insert_tbl(conn, id=2, name="test")


asyncio.run(go())

Complex sqlalchemy queries

import asyncio
import datetime
import random

import sqlalchemy as sa

from aiopg.sa import create_engine

metadata = sa.MetaData()

users = sa.Table(
    "users",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("name", sa.String(255)),
    sa.Column("birthday", sa.DateTime),
)

emails = sa.Table(
    "emails",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("user_id", None, sa.ForeignKey("users.id")),
    sa.Column("email", sa.String(255), nullable=False),
    sa.Column("private", sa.Boolean, nullable=False),
)


async def create_tables(conn):
    await conn.execute("DROP TABLE IF EXISTS emails")
    await conn.execute("DROP TABLE IF EXISTS users")
    await conn.execute(
        """CREATE TABLE users (
                                        id serial PRIMARY KEY,
                                        name varchar(255),
                                        birthday timestamp)"""
    )
    await conn.execute(
        """CREATE TABLE emails (
                                id serial,
                                user_id int references users(id),
                                email varchar(253),
                                private bool)"""
    )


names = {
    "Andrew",
    "Bob",
    "John",
    "Vitaly",
    "Alex",
    "Lina",
    "Olga",
    "Doug",
    "Julia",
    "Matt",
    "Jessica",
    "Nick",
    "Dave",
    "Martin",
    "Abbi",
    "Eva",
    "Lori",
    "Rita",
    "Rosa",
    "Ivy",
    "Clare",
    "Maria",
    "Jenni",
    "Margo",
    "Anna",
}


def gen_birthday():
    now = datetime.datetime.now()
    year = random.randint(now.year - 30, now.year - 20)
    month = random.randint(1, 12)
    day = random.randint(1, 28)
    return datetime.datetime(year, month, day)


async def fill_data(conn):
    async with conn.begin():
        for name in random.sample(names, len(names)):
            uid = await conn.scalar(
                users.insert().values(name=name, birthday=gen_birthday())
            )
            emails_count = int(random.paretovariate(2))
            for num in random.sample(range(10000), emails_count):
                is_private = random.uniform(0, 1) < 0.8
                await conn.execute(
                    emails.insert().values(
                        user_id=uid,
                        email=f"{name}+{num}@gmail.com",
                        private=is_private,
                    )
                )


async def count(conn):
    c1 = await conn.scalar(users.count())
    c2 = await conn.scalar(emails.count())
    print("Population consists of", c1, "people with", c2, "emails in total")
    join = sa.join(emails, users, users.c.id == emails.c.user_id)
    query = (
        sa.select([users.c.name])
        .select_from(join)
        .where(emails.c.private == False)  # noqa
        .group_by(users.c.name)
        .having(sa.func.count(emails.c.private) > 0)
    )

    print("Users with public emails:")
    async for row in conn.execute(query):
        print(row.name)

    print()


async def show_julia(conn):
    print("Lookup for Julia:")
    join = sa.join(emails, users, users.c.id == emails.c.user_id)
    query = (
        sa.select([users, emails], use_labels=True)
        .select_from(join)
        .where(users.c.name == "Julia")
    )
    async for row in conn.execute(query):
        print(
            row.users_name,
            row.users_birthday,
            row.emails_email,
            row.emails_private,
        )
    print()


async def ave_age(conn):
    query = sa.select(
        [sa.func.avg(sa.func.age(users.c.birthday))]
    ).select_from(users)
    ave = await conn.scalar(query)
    print(
        "Average age of population is",
        ave,
        "or ~",
        int(ave.days / 365),
        "years",
    )
    print()


async def go():
    engine = await create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    )
    async with engine:
        async with engine.acquire() as conn:
            await create_tables(conn)
            await fill_data(conn)
            await count(conn)
            await show_julia(conn)
            await ave_age(conn)


asyncio.run(go())

Simple transaction in sqlalchemy

import asyncio

import sqlalchemy as sa
from sqlalchemy.schema import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()

users = sa.Table(
    "users_sa_transaction",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("name", sa.String(255)),
)


async def create_sa_transaction_tables(conn):
    await conn.execute(CreateTable(users))


async def check_count_users(conn, *, count):
    s_query = sa.select(users).select_from(users)
    assert count == len(list(await (await conn.execute(s_query)).fetchall()))


async def success_transaction(conn):
    await check_count_users(conn, count=0)

    async with conn.begin():
        await conn.execute(sa.insert(users).values(id=1, name="test1"))
        await conn.execute(sa.insert(users).values(id=2, name="test2"))

    await check_count_users(conn, count=2)

    async with conn.begin():
        await conn.execute(sa.delete(users).where(users.c.id == 1))
        await conn.execute(sa.delete(users).where(users.c.id == 2))

    await check_count_users(conn, count=0)


async def fail_transaction(conn):
    await check_count_users(conn, count=0)

    trans = await conn.begin()

    try:
        await conn.execute(sa.insert(users).values(id=1, name="test1"))
        raise RuntimeError()

    except RuntimeError:
        await trans.rollback()
    else:
        await trans.commit()

    await check_count_users(conn, count=0)


async def success_nested_transaction(conn):
    await check_count_users(conn, count=0)

    async with conn.begin_nested():
        await conn.execute(sa.insert(users).values(id=1, name="test1"))

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=2, name="test2"))

    await check_count_users(conn, count=2)

    async with conn.begin():
        await conn.execute(sa.delete(users).where(users.c.id == 1))
        await conn.execute(sa.delete(users).where(users.c.id == 2))

    await check_count_users(conn, count=0)


async def fail_nested_transaction(conn):
    await check_count_users(conn, count=0)

    async with conn.begin_nested():
        await conn.execute(sa.insert(users).values(id=1, name="test1"))

        tr_f = await conn.begin_nested()
        try:
            await conn.execute(sa.insert(users).values(id=2, name="test2"))
            raise RuntimeError()

        except RuntimeError:
            await tr_f.rollback()
        else:
            await tr_f.commit()

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=2, name="test2"))

    await check_count_users(conn, count=2)

    async with conn.begin():
        await conn.execute(sa.delete(users).where(users.c.id == 1))
        await conn.execute(sa.delete(users).where(users.c.id == 2))

    await check_count_users(conn, count=0)


async def fail_first_nested_transaction(conn):
    trans = await conn.begin_nested()

    try:
        await conn.execute(sa.insert(users).values(id=1, name="test1"))

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=2, name="test2"))

        async with conn.begin_nested():
            await conn.execute(sa.insert(users).values(id=3, name="test3"))

        raise RuntimeError()

    except RuntimeError:
        await trans.rollback()
    else:
        await trans.commit()

    await check_count_users(conn, count=0)


async def go():
    engine = await create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    )
    async with engine:
        async with engine.acquire() as conn:
            await create_sa_transaction_tables(conn)

            await success_transaction(conn)
            await fail_transaction(conn)

            await success_nested_transaction(conn)
            await fail_nested_transaction(conn)
            await fail_first_nested_transaction(conn)


asyncio.run(go())

Isolation transaction in sqlalchemy

import asyncio

import sqlalchemy as sa
from psycopg2 import InternalError
from psycopg2.extensions import TransactionRollbackError
from sqlalchemy.sql.ddl import CreateTable

from aiopg.sa import create_engine

metadata = sa.MetaData()

users = sa.Table(
    "users_sa_isolation_transaction",
    metadata,
    sa.Column("id", sa.Integer, primary_key=True),
    sa.Column("name", sa.String(255)),
)


async def create_sa_transaction_tables(conn):
    await conn.execute(CreateTable(users))


async def repea_sa_transaction(conn, conn2):
    isolation_level = "REPEATABLE READ"
    await conn.execute(sa.insert(users).values(id=1, name="test1"))
    t1 = await conn.begin(isolation_level=isolation_level)

    where = users.c.id == 1
    q_user = users.select().where(where)
    user = await (await conn.execute(q_user)).fetchone()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.update(users).values({"name": "name2"}).where(where))

    t2 = await conn2.begin(isolation_level=isolation_level)
    assert await (await conn2.execute(q_user)).fetchone() == user

    await t1.commit()

    await conn2.execute(users.insert().values({"id": 2, "name": "test"}))

    try:
        await conn2.execute(
            sa.update(users).values({"name": "t"}).where(where)
        )
    except TransactionRollbackError as e:
        assert e.pgcode == "40001"

    await t2.commit()

    assert len(await (await conn2.execute(q_user)).fetchall()) == 1
    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def serializable_sa_transaction(conn, conn2):
    isolation_level = "SERIALIZABLE"
    await conn.execute(sa.insert(users).values(id=1, name="test1"))
    t1 = await conn.begin(isolation_level=isolation_level)

    where = users.c.id == 1
    q_user = users.select().where(where)
    user = await (await conn.execute(q_user)).fetchone()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.update(users).values({"name": "name2"}).where(where))

    t2 = await conn2.begin(isolation_level=isolation_level)
    assert await (await conn2.execute(q_user)).fetchone() == user

    await t1.commit()

    try:
        await conn2.execute(users.insert().values({"id": 2, "name": "test"}))
    except TransactionRollbackError as e:
        assert e.pgcode == "40001"

    try:
        await conn2.execute(users.update().values({"name": "t"}).where(where))
    except InternalError as e:
        assert e.pgcode == "25P02"

    await t2.commit()

    user = dict(await (await conn2.execute(q_user)).fetchone())
    assert user == {"name": "name2", "id": 1}

    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def read_only_read_sa_transaction(conn, deferrable):
    await conn.execute(sa.insert(users).values(id=1, name="test1"))
    t1 = await conn.begin(
        isolation_level="SERIALIZABLE", readonly=True, deferrable=deferrable
    )

    where = users.c.id == 1

    try:
        await conn.execute(sa.update(users).values({"name": "t"}).where(where))
    except InternalError as e:
        assert e.pgcode == "25006"

    await t1.commit()

    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def isolation_read_sa_transaction(conn, conn2):
    await conn.execute(sa.insert(users).values(id=1, name="test1"))
    t1 = await conn.begin()

    where = users.c.id == 1
    q_user = users.select().where(where)
    user = await (await conn.execute(q_user)).fetchone()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.update(users).values({"name": "name2"}).where(where))

    t2 = await conn2.begin()
    assert await (await conn2.execute(q_user)).fetchone() == user

    await t1.commit()

    await conn2.execute(sa.update(users).values(user).where(where))
    await t2.commit()

    assert await (await conn2.execute(q_user)).fetchone() == user

    await conn.execute(sa.delete(users))
    assert len(await (await conn.execute(users.select())).fetchall()) == 0


async def go():
    engine = await create_engine(
        user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
    )
    async with engine:
        async with engine.acquire() as conn:
            await create_sa_transaction_tables(conn)

        async with engine.acquire() as conn:
            await read_only_read_sa_transaction(conn, True)
            await read_only_read_sa_transaction(conn, False)

            async with engine.acquire() as conn2:
                await repea_sa_transaction(conn, conn2)
                await serializable_sa_transaction(conn, conn2)
                await isolation_read_sa_transaction(conn, conn2)


asyncio.run(go())