Examples of aiopg usage¶
Below is a list of examples from aiopg/examples
Every example is a correct tiny python program.
Low-level API¶
import asyncio
import aiopg
dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"
async def test_select():
async with aiopg.create_pool(dsn) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
ret = []
async for row in cur:
ret.append(row)
assert ret == [(1,)]
print("ALL DONE")
asyncio.run(test_select())
Usage of LISTEN/NOTIFY commands¶
import asyncio
import psycopg2
import aiopg
dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"
async def notify(conn):
async with conn.cursor() as cur:
for i in range(5):
msg = f"message {i}"
print("Send ->", msg)
await cur.execute("NOTIFY channel, %s", (msg,))
await cur.execute("NOTIFY channel, 'finish'")
async def listen(conn):
async with conn.cursor() as cur:
await cur.execute("LISTEN channel")
while True:
try:
msg = await conn.notifies.get()
except psycopg2.Error as ex:
print("ERROR: ", ex)
return
if msg.payload == "finish":
return
else:
print("Receive <-", msg.payload)
async def main():
async with aiopg.connect(dsn) as listenConn:
async with aiopg.create_pool(dsn) as notifyPool:
async with notifyPool.acquire() as notifyConn:
listener = listen(listenConn)
notifier = notify(notifyConn)
await asyncio.gather(listener, notifier)
print("ALL DONE")
asyncio.run(main())
Simple sqlalchemy usage¶
import asyncio
import sqlalchemy as sa
from aiopg.sa import create_engine
metadata = sa.MetaData()
tbl = sa.Table(
"tbl",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("val", sa.String(255)),
)
async def create_table(conn):
await conn.execute("DROP TABLE IF EXISTS tbl")
await conn.execute(
"""CREATE TABLE tbl (
id serial PRIMARY KEY,
val varchar(255))"""
)
async def go():
async with create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
) as engine:
async with engine.acquire() as conn:
await create_table(conn)
async with engine.acquire() as conn:
await conn.execute(tbl.insert().values(val="abc"))
async for row in conn.execute(tbl.select()):
print(row.id, row.val)
asyncio.run(go())
Default value field sqlalchemy usage¶
import asyncio
import datetime
import uuid
import sqlalchemy as sa
from sqlalchemy.sql.ddl import CreateTable
from aiopg.sa import create_engine
metadata = sa.MetaData()
now = datetime.datetime.now
tbl = sa.Table(
"tbl",
metadata,
sa.Column("id", sa.Integer, autoincrement=True, primary_key=True),
sa.Column("uuid", sa.String, default=lambda: str(uuid.uuid4())),
sa.Column("name", sa.String(255), default="default name"),
sa.Column("date", sa.DateTime, default=datetime.datetime.now),
sa.Column("flag", sa.Integer, default=0),
sa.Column("count_str", sa.Integer, default=sa.func.length("default")),
sa.Column("is_active", sa.Boolean, default=True),
)
async def insert_tbl(conn, pk, **kwargs):
await conn.execute(tbl.insert().values(**kwargs))
row = await (await conn.execute(tbl.select())).first()
assert row.id == pk
for name, val in kwargs.items():
assert row[name] == val
await conn.execute(sa.delete(tbl))
async def create_table(conn):
await conn.execute("DROP TABLE IF EXISTS tbl")
await conn.execute(CreateTable(tbl))
async def go():
async with create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
) as engine:
async with engine.acquire() as conn:
await create_table(conn)
async with engine.acquire() as conn:
await insert_tbl(conn, 1)
await insert_tbl(conn, 2, name="test", is_active=False, date=now())
asyncio.run(go())
Types field sqlalchemy usage¶
import asyncio
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ARRAY, ENUM, JSON
from sqlalchemy.sql.ddl import CreateTable
from aiopg.sa import create_engine
metadata = sa.MetaData()
class CustomStrList(sa.types.TypeDecorator):
impl = sa.types.String
def __init__(self, sep=",", *args, **kwargs):
self._sep = sep
self._args = args
self._kwargs = kwargs
super().__init__(*args, **kwargs)
def process_bind_param(self, value, dialect):
return f"{self._sep}".join(map(str, value))
def process_result_value(self, value, dialect):
if value is None:
return value
return value.split(self._sep)
def copy(self):
return CustomStrList(self._sep, *self._args, **self._kwargs)
tbl = sa.Table(
"tbl",
metadata,
sa.Column("id", sa.Integer, autoincrement=True, primary_key=True),
sa.Column("json", JSON, default=None),
sa.Column("array_int", ARRAY(sa.Integer), default=list),
sa.Column("enum", ENUM("f", "s", name="s_enum"), default="s"),
sa.Column("custom_list", CustomStrList(), default=list),
)
async def insert_tbl(conn, pk, **kwargs):
await conn.execute(tbl.insert().values(**kwargs))
row = await (await conn.execute(tbl.select())).first()
assert row.id == pk
for name, val in kwargs.items():
assert row[name] == val
await conn.execute(sa.delete(tbl))
async def create_table(conn):
await conn.execute("DROP TABLE IF EXISTS tbl")
await conn.execute("DROP TYPE IF EXISTS s_enum CASCADE")
await conn.execute("CREATE TYPE s_enum AS ENUM ('f', 's')")
await conn.execute(CreateTable(tbl))
async def go():
async with create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
) as engine:
async with engine.acquire() as conn:
await create_table(conn)
async with engine.acquire() as conn:
await insert_tbl(conn, 1)
await insert_tbl(conn, 2, json={"data": 123})
await insert_tbl(conn, 3, array_int=[1, 3, 4])
await insert_tbl(conn, 4, enum="f")
await insert_tbl(conn, 5, custom_list=["1", "test", "4"])
asyncio.run(go())
Named field sqlalchemy usage¶
import asyncio
import datetime
import sqlalchemy as sa
from aiopg.sa import create_engine
metadata = sa.MetaData()
now = datetime.datetime.now
tbl = sa.Table(
"tbl",
metadata,
sa.Column("MyIDField", sa.Integer, key="id", primary_key=True),
sa.Column("NaMe", sa.String(255), key="name", default="default name"),
)
async def insert_tbl(conn, **kwargs):
await conn.execute(tbl.insert().values(**kwargs))
row = await (await conn.execute(tbl.select())).first()
for name, val in kwargs.items():
assert row[name] == val
await conn.execute(sa.delete(tbl))
async def create_table(conn):
await conn.execute("DROP TABLE IF EXISTS tbl")
await conn.execute(
"CREATE TABLE tbl ("
'"MyIDField" INTEGER NOT NULL, '
'"NaMe" VARCHAR(255), '
'PRIMARY KEY ("MyIDField"))'
)
async def go():
async with create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
) as engine:
async with engine.acquire() as conn:
await create_table(conn)
await insert_tbl(conn, id=1)
await insert_tbl(conn, id=2, name="test")
asyncio.run(go())
Complex sqlalchemy queries¶
import asyncio
import datetime
import random
import sqlalchemy as sa
from aiopg.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(255)),
sa.Column("birthday", sa.DateTime),
)
emails = sa.Table(
"emails",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("user_id", None, sa.ForeignKey("users.id")),
sa.Column("email", sa.String(255), nullable=False),
sa.Column("private", sa.Boolean, nullable=False),
)
async def create_tables(conn):
await conn.execute("DROP TABLE IF EXISTS emails")
await conn.execute("DROP TABLE IF EXISTS users")
await conn.execute(
"""CREATE TABLE users (
id serial PRIMARY KEY,
name varchar(255),
birthday timestamp)"""
)
await conn.execute(
"""CREATE TABLE emails (
id serial,
user_id int references users(id),
email varchar(253),
private bool)"""
)
names = {
"Andrew",
"Bob",
"John",
"Vitaly",
"Alex",
"Lina",
"Olga",
"Doug",
"Julia",
"Matt",
"Jessica",
"Nick",
"Dave",
"Martin",
"Abbi",
"Eva",
"Lori",
"Rita",
"Rosa",
"Ivy",
"Clare",
"Maria",
"Jenni",
"Margo",
"Anna",
}
def gen_birthday():
now = datetime.datetime.now()
year = random.randint(now.year - 30, now.year - 20)
month = random.randint(1, 12)
day = random.randint(1, 28)
return datetime.datetime(year, month, day)
async def fill_data(conn):
async with conn.begin():
for name in random.sample(names, len(names)):
uid = await conn.scalar(
users.insert().values(name=name, birthday=gen_birthday())
)
emails_count = int(random.paretovariate(2))
for num in random.sample(range(10000), emails_count):
is_private = random.uniform(0, 1) < 0.8
await conn.execute(
emails.insert().values(
user_id=uid,
email=f"{name}+{num}@gmail.com",
private=is_private,
)
)
async def count(conn):
c1 = await conn.scalar(users.count())
c2 = await conn.scalar(emails.count())
print("Population consists of", c1, "people with", c2, "emails in total")
join = sa.join(emails, users, users.c.id == emails.c.user_id)
query = (
sa.select([users.c.name])
.select_from(join)
.where(emails.c.private == False) # noqa
.group_by(users.c.name)
.having(sa.func.count(emails.c.private) > 0)
)
print("Users with public emails:")
async for row in conn.execute(query):
print(row.name)
print()
async def show_julia(conn):
print("Lookup for Julia:")
join = sa.join(emails, users, users.c.id == emails.c.user_id)
query = (
sa.select([users, emails], use_labels=True)
.select_from(join)
.where(users.c.name == "Julia")
)
async for row in conn.execute(query):
print(
row.users_name,
row.users_birthday,
row.emails_email,
row.emails_private,
)
print()
async def ave_age(conn):
query = sa.select(
[sa.func.avg(sa.func.age(users.c.birthday))]
).select_from(users)
ave = await conn.scalar(query)
print(
"Average age of population is",
ave,
"or ~",
int(ave.days / 365),
"years",
)
print()
async def go():
engine = await create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
)
async with engine:
async with engine.acquire() as conn:
await create_tables(conn)
await fill_data(conn)
await count(conn)
await show_julia(conn)
await ave_age(conn)
asyncio.run(go())
Simple transaction in sqlalchemy¶
import asyncio
import sqlalchemy as sa
from sqlalchemy.schema import CreateTable
from aiopg.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users_sa_transaction",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(255)),
)
async def create_sa_transaction_tables(conn):
await conn.execute(CreateTable(users))
async def check_count_users(conn, *, count):
s_query = sa.select(users).select_from(users)
assert count == len(list(await (await conn.execute(s_query)).fetchall()))
async def success_transaction(conn):
await check_count_users(conn, count=0)
async with conn.begin():
await conn.execute(sa.insert(users).values(id=1, name="test1"))
await conn.execute(sa.insert(users).values(id=2, name="test2"))
await check_count_users(conn, count=2)
async with conn.begin():
await conn.execute(sa.delete(users).where(users.c.id == 1))
await conn.execute(sa.delete(users).where(users.c.id == 2))
await check_count_users(conn, count=0)
async def fail_transaction(conn):
await check_count_users(conn, count=0)
trans = await conn.begin()
try:
await conn.execute(sa.insert(users).values(id=1, name="test1"))
raise RuntimeError()
except RuntimeError:
await trans.rollback()
else:
await trans.commit()
await check_count_users(conn, count=0)
async def success_nested_transaction(conn):
await check_count_users(conn, count=0)
async with conn.begin_nested():
await conn.execute(sa.insert(users).values(id=1, name="test1"))
async with conn.begin_nested():
await conn.execute(sa.insert(users).values(id=2, name="test2"))
await check_count_users(conn, count=2)
async with conn.begin():
await conn.execute(sa.delete(users).where(users.c.id == 1))
await conn.execute(sa.delete(users).where(users.c.id == 2))
await check_count_users(conn, count=0)
async def fail_nested_transaction(conn):
await check_count_users(conn, count=0)
async with conn.begin_nested():
await conn.execute(sa.insert(users).values(id=1, name="test1"))
tr_f = await conn.begin_nested()
try:
await conn.execute(sa.insert(users).values(id=2, name="test2"))
raise RuntimeError()
except RuntimeError:
await tr_f.rollback()
else:
await tr_f.commit()
async with conn.begin_nested():
await conn.execute(sa.insert(users).values(id=2, name="test2"))
await check_count_users(conn, count=2)
async with conn.begin():
await conn.execute(sa.delete(users).where(users.c.id == 1))
await conn.execute(sa.delete(users).where(users.c.id == 2))
await check_count_users(conn, count=0)
async def fail_first_nested_transaction(conn):
trans = await conn.begin_nested()
try:
await conn.execute(sa.insert(users).values(id=1, name="test1"))
async with conn.begin_nested():
await conn.execute(sa.insert(users).values(id=2, name="test2"))
async with conn.begin_nested():
await conn.execute(sa.insert(users).values(id=3, name="test3"))
raise RuntimeError()
except RuntimeError:
await trans.rollback()
else:
await trans.commit()
await check_count_users(conn, count=0)
async def go():
engine = await create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
)
async with engine:
async with engine.acquire() as conn:
await create_sa_transaction_tables(conn)
await success_transaction(conn)
await fail_transaction(conn)
await success_nested_transaction(conn)
await fail_nested_transaction(conn)
await fail_first_nested_transaction(conn)
asyncio.run(go())
Isolation transaction in sqlalchemy¶
import asyncio
import sqlalchemy as sa
from psycopg2 import InternalError
from psycopg2.extensions import TransactionRollbackError
from sqlalchemy.sql.ddl import CreateTable
from aiopg.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users_sa_isolation_transaction",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(255)),
)
async def create_sa_transaction_tables(conn):
await conn.execute(CreateTable(users))
async def repea_sa_transaction(conn, conn2):
isolation_level = "REPEATABLE READ"
await conn.execute(sa.insert(users).values(id=1, name="test1"))
t1 = await conn.begin(isolation_level=isolation_level)
where = users.c.id == 1
q_user = users.select().where(where)
user = await (await conn.execute(q_user)).fetchone()
assert await (await conn2.execute(q_user)).fetchone() == user
await conn.execute(sa.update(users).values({"name": "name2"}).where(where))
t2 = await conn2.begin(isolation_level=isolation_level)
assert await (await conn2.execute(q_user)).fetchone() == user
await t1.commit()
await conn2.execute(users.insert().values({"id": 2, "name": "test"}))
try:
await conn2.execute(
sa.update(users).values({"name": "t"}).where(where)
)
except TransactionRollbackError as e:
assert e.pgcode == "40001"
await t2.commit()
assert len(await (await conn2.execute(q_user)).fetchall()) == 1
await conn.execute(sa.delete(users))
assert len(await (await conn.execute(users.select())).fetchall()) == 0
async def serializable_sa_transaction(conn, conn2):
isolation_level = "SERIALIZABLE"
await conn.execute(sa.insert(users).values(id=1, name="test1"))
t1 = await conn.begin(isolation_level=isolation_level)
where = users.c.id == 1
q_user = users.select().where(where)
user = await (await conn.execute(q_user)).fetchone()
assert await (await conn2.execute(q_user)).fetchone() == user
await conn.execute(sa.update(users).values({"name": "name2"}).where(where))
t2 = await conn2.begin(isolation_level=isolation_level)
assert await (await conn2.execute(q_user)).fetchone() == user
await t1.commit()
try:
await conn2.execute(users.insert().values({"id": 2, "name": "test"}))
except TransactionRollbackError as e:
assert e.pgcode == "40001"
try:
await conn2.execute(users.update().values({"name": "t"}).where(where))
except InternalError as e:
assert e.pgcode == "25P02"
await t2.commit()
user = dict(await (await conn2.execute(q_user)).fetchone())
assert user == {"name": "name2", "id": 1}
await conn.execute(sa.delete(users))
assert len(await (await conn.execute(users.select())).fetchall()) == 0
async def read_only_read_sa_transaction(conn, deferrable):
await conn.execute(sa.insert(users).values(id=1, name="test1"))
t1 = await conn.begin(
isolation_level="SERIALIZABLE", readonly=True, deferrable=deferrable
)
where = users.c.id == 1
try:
await conn.execute(sa.update(users).values({"name": "t"}).where(where))
except InternalError as e:
assert e.pgcode == "25006"
await t1.commit()
await conn.execute(sa.delete(users))
assert len(await (await conn.execute(users.select())).fetchall()) == 0
async def isolation_read_sa_transaction(conn, conn2):
await conn.execute(sa.insert(users).values(id=1, name="test1"))
t1 = await conn.begin()
where = users.c.id == 1
q_user = users.select().where(where)
user = await (await conn.execute(q_user)).fetchone()
assert await (await conn2.execute(q_user)).fetchone() == user
await conn.execute(sa.update(users).values({"name": "name2"}).where(where))
t2 = await conn2.begin()
assert await (await conn2.execute(q_user)).fetchone() == user
await t1.commit()
await conn2.execute(sa.update(users).values(user).where(where))
await t2.commit()
assert await (await conn2.execute(q_user)).fetchone() == user
await conn.execute(sa.delete(users))
assert len(await (await conn.execute(users.select())).fetchall()) == 0
async def go():
engine = await create_engine(
user="aiopg", database="aiopg", host="127.0.0.1", password="passwd"
)
async with engine:
async with engine.acquire() as conn:
await create_sa_transaction_tables(conn)
async with engine.acquire() as conn:
await read_only_read_sa_transaction(conn, True)
await read_only_read_sa_transaction(conn, False)
async with engine.acquire() as conn2:
await repea_sa_transaction(conn, conn2)
await serializable_sa_transaction(conn, conn2)
await isolation_read_sa_transaction(conn, conn2)
asyncio.run(go())