Examples of aiopg usage

Below is a list of examples from aiopg/examples

Every example is a correct tiny python program.

async/await style

Low-level API

import asyncio
import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


async def test_select():
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                ret = []
                async for row in cur:
                    ret.append(row)
                assert ret == [(1,)]
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(test_select())

Usage of LISTEN/NOTIFY commands

import asyncio
import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


async def notify(conn):
    async with conn.cursor() as cur:
        for i in range(5):
            msg = "message {}".format(i)
            print('Send ->', msg)
            await cur.execute("NOTIFY channel, '{}'".format(msg))

        await cur.execute("NOTIFY channel, 'finish'")


async def listen(conn):
    async with conn.cursor() as cur:
        await cur.execute("LISTEN channel")
        while True:
            msg = await conn.notifies.get()
            if msg.payload == 'finish':
                return
            else:
                print('Receive <-', msg.payload)


async def main():
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn1:
            listener = listen(conn1)
            async with pool.acquire() as conn2:
                notifier = notify(conn2)
                await asyncio.gather(listener, notifier)
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Simple sqlalchemy usage

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa


metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))


async def create_table(conn):
    await conn.execute('DROP TABLE IF EXISTS tbl')
    await conn.execute('''CREATE TABLE tbl (
    id serial PRIMARY KEY,
    val varchar(255))''')


async def go():
    async with create_engine(user='aiopg',
                             database='aiopg',
                             host='127.0.0.1',
                             password='passwd') as engine:
        async with engine.acquire() as conn:
            await create_table(conn)
        async with engine.acquire() as conn:
            await conn.execute(tbl.insert().values(val='abc'))

            async for row in conn.execute(tbl.select()):
                print(row.id, row.val)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Complex sqlalchemy queries

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
import random
import datetime


metadata = sa.MetaData()

users = sa.Table('users', metadata,
                 sa.Column('id', sa.Integer, primary_key=True),
                 sa.Column('name', sa.String(255)),
                 sa.Column('birthday', sa.DateTime))

emails = sa.Table('emails', metadata,
                  sa.Column('id', sa.Integer, primary_key=True),
                  sa.Column('user_id', None, sa.ForeignKey('users.id')),
                  sa.Column('email', sa.String(255), nullable=False),
                  sa.Column('private', sa.Boolean, nullable=False))


async def create_tables(conn):
    await conn.execute('DROP TABLE IF EXISTS emails')
    await conn.execute('DROP TABLE IF EXISTS users')
    await conn.execute('''CREATE TABLE users (
                                        id serial PRIMARY KEY,
                                        name varchar(255),
                                        birthday timestamp)''')
    await conn.execute('''CREATE TABLE emails (
                                id serial,
                                user_id int references users(id),
                                email varchar(253),
                                private bool)''')


names = {'Andrew', 'Bob', 'John', 'Vitaly', 'Alex', 'Lina', 'Olga',
         'Doug', 'Julia', 'Matt', 'Jessica', 'Nick', 'Dave', 'Martin',
         'Abbi', 'Eva', 'Lori', 'Rita', 'Rosa', 'Ivy', 'Clare', 'Maria',
         'Jenni', 'Margo', 'Anna'}


def gen_birthday():
    now = datetime.datetime.now()
    year = random.randint(now.year - 30, now.year - 20)
    month = random.randint(1, 12)
    day = random.randint(1, 28)
    return datetime.datetime(year, month, day)


async def fill_data(conn):
    async with conn.begin():
        for name in random.sample(names, len(names)):
            uid = await conn.scalar(
                users.insert().values(name=name, birthday=gen_birthday()))
            emails_count = int(random.paretovariate(2))
            for num in random.sample(range(10000), emails_count):
                is_private = random.uniform(0, 1) < 0.8
                await conn.execute(emails.insert().values(
                    user_id=uid,
                    email='{}+{}@gmail.com'.format(name, num),
                    private=is_private))


async def count(conn):
    c1 = (await conn.scalar(users.count()))
    c2 = (await conn.scalar(emails.count()))
    print("Population consists of", c1, "people with",
          c2, "emails in total")
    join = sa.join(emails, users, users.c.id == emails.c.user_id)
    query = (sa.select([users.c.name])
             .select_from(join)
             .where(emails.c.private == False)  # noqa
             .group_by(users.c.name)
             .having(sa.func.count(emails.c.private) > 0))

    print("Users with public emails:")
    async for row in conn.execute(query):
        print(row.name)

    print()


async def show_julia(conn):
    print("Lookup for Julia:")
    join = sa.join(emails, users, users.c.id == emails.c.user_id)
    query = (sa.select([users, emails], use_labels=True)
             .select_from(join).where(users.c.name == 'Julia'))
    async for row in conn.execute(query):
        print(row.users_name, row.users_birthday,
              row.emails_email, row.emails_private)
    print()


async def ave_age(conn):
    query = (sa.select([sa.func.avg(sa.func.age(users.c.birthday))])
             .select_from(users))
    ave = (await conn.scalar(query))
    print("Average age of population is", ave,
          "or ~", int(ave.days / 365), "years")
    print()


async def go():
    engine = await create_engine(user='aiopg',
                                 database='aiopg',
                                 host='127.0.0.1',
                                 password='passwd')
    async with engine:
        async with engine.acquire() as conn:
            await create_tables(conn)
            await fill_data(conn)
            await count(conn)
            await show_julia(conn)
            await ave_age(conn)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

yield from/@coroutine style

Old style Low-level API

import asyncio
import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


@asyncio.coroutine
def test_select():
    pool = yield from aiopg.create_pool(dsn)
    with (yield from pool.cursor()) as cur:
        yield from cur.execute("SELECT 1")
        ret = yield from cur.fetchone()
        assert ret == (1,)
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(test_select())

Usage of LISTEN/NOTIFY commands using old-style API

import asyncio
import aiopg

dsn = 'dbname=aiopg user=aiopg password=passwd host=127.0.0.1'


@asyncio.coroutine
def notify(conn):
    cur = yield from conn.cursor()
    try:
        for i in range(5):
            msg = "message {}".format(i)
            print('Send ->', msg)
            yield from cur.execute("NOTIFY channel, '{}'".format(msg))

        yield from cur.execute("NOTIFY channel, 'finish'")
    finally:
        cur.close()


@asyncio.coroutine
def listen(conn):
    cur = yield from conn.cursor()
    try:
        yield from cur.execute("LISTEN channel")
        while True:
            msg = yield from conn.notifies.get()
            if msg.payload == 'finish':
                return
            else:
                print('Receive <-', msg.payload)
    finally:
        cur.close()


@asyncio.coroutine
def main():
    pool = yield from aiopg.create_pool(dsn)
    with (yield from pool) as conn1:
        listener = listen(conn1)
        with (yield from pool) as conn2:
            notifier = notify(conn2)
            yield from asyncio.gather(listener, notifier)
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Simple sqlalchemy usage commands using old-style API

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa


metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))


@asyncio.coroutine
def create_table(engine):
    with (yield from engine) as conn:
        yield from conn.execute('DROP TABLE IF EXISTS tbl')
        yield from conn.execute('''CREATE TABLE tbl (
                                            id serial PRIMARY KEY,
                                            val varchar(255))''')


@asyncio.coroutine
def go():
    engine = yield from create_engine(user='aiopg',
                                      database='aiopg',
                                      host='127.0.0.1',
                                      password='passwd')

    yield from create_table(engine)
    with (yield from engine) as conn:
        yield from conn.execute(tbl.insert().values(val='abc'))

        res = yield from conn.execute(tbl.select())
        for row in res:
            print(row.id, row.val)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())

Complex sqlalchemy queries commands using old-style API

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa
import random
import datetime


metadata = sa.MetaData()

users = sa.Table('users', metadata,
                 sa.Column('id', sa.Integer, primary_key=True),
                 sa.Column('name', sa.String(255)),
                 sa.Column('birthday', sa.DateTime))

emails = sa.Table('emails', metadata,
                  sa.Column('id', sa.Integer, primary_key=True),
                  sa.Column('user_id', None, sa.ForeignKey('users.id')),
                  sa.Column('email', sa.String(255), nullable=False),
                  sa.Column('private', sa.Boolean, nullable=False))


@asyncio.coroutine
def create_tables(engine):
    with (yield from engine) as conn:
        yield from conn.execute('DROP TABLE IF EXISTS emails')
        yield from conn.execute('DROP TABLE IF EXISTS users')
        yield from conn.execute('''CREATE TABLE users (
                                            id serial PRIMARY KEY,
                                            name varchar(255),
                                            birthday timestamp)''')
        yield from conn.execute('''CREATE TABLE emails (
                                    id serial,
                                    user_id int references users(id),
                                    email varchar(253),
                                    private bool)''')


names = {'Andrew', 'Bob', 'John', 'Vitaly', 'Alex', 'Lina', 'Olga',
         'Doug', 'Julia', 'Matt', 'Jessica', 'Nick', 'Dave', 'Martin',
         'Abbi', 'Eva', 'Lori', 'Rita', 'Rosa', 'Ivy', 'Clare', 'Maria',
         'Jenni', 'Margo', 'Anna'}


def gen_birthday():
    now = datetime.datetime.now()
    year = random.randint(now.year - 30, now.year - 20)
    month = random.randint(1, 12)
    day = random.randint(1, 28)
    return datetime.datetime(year, month, day)


@asyncio.coroutine
def fill_data(engine):
    with (yield from engine) as conn:
        tr = yield from conn.begin()

        for name in random.sample(names, len(names)):
            uid = yield from conn.scalar(
                users.insert().values(name=name, birthday=gen_birthday()))
            emails_count = int(random.paretovariate(2))
            for num in random.sample(range(10000), emails_count):
                is_private = random.uniform(0, 1) < 0.8
                yield from conn.execute(emails.insert().values(
                    user_id=uid,
                    email='{}+{}@gmail.com'.format(name, num),
                    private=is_private))
        yield from tr.commit()


@asyncio.coroutine
def count(engine):
    with (yield from engine) as conn:
        c1 = (yield from conn.scalar(users.count()))
        c2 = (yield from conn.scalar(emails.count()))
        print("Population consists of", c1, "people with",
              c2, "emails in total")
        join = sa.join(emails, users, users.c.id == emails.c.user_id)
        query = (sa.select([users.c.name])
                 .select_from(join)
                 .where(emails.c.private == False)  # noqa
                 .group_by(users.c.name)
                 .having(sa.func.count(emails.c.private) > 0))

        print("Users with public emails:")
        ret = yield from conn.execute(query)
        for row in ret:
            print(row.name)

        print()


@asyncio.coroutine
def show_julia(engine):
    with (yield from engine) as conn:
        print("Lookup for Julia:")
        join = sa.join(emails, users, users.c.id == emails.c.user_id)
        query = (sa.select([users, emails], use_labels=True)
                 .select_from(join).where(users.c.name == 'Julia'))
        res = yield from conn.execute(query)
        for row in res:
            print(row.users_name, row.users_birthday,
                  row.emails_email, row.emails_private)
        print()


@asyncio.coroutine
def ave_age(engine):
    with (yield from engine) as conn:
        query = (sa.select([sa.func.avg(sa.func.age(users.c.birthday))])
                 .select_from(users))
        ave = (yield from conn.scalar(query))
        print("Average age of population is", ave,
              "or ~", int(ave.days / 365), "years")
        print()


@asyncio.coroutine
def go():
    engine = yield from create_engine(user='aiopg',
                                      database='aiopg',
                                      host='127.0.0.1',
                                      password='passwd')

    yield from create_tables(engine)
    yield from fill_data(engine)
    yield from count(engine)
    yield from show_julia(engine)
    yield from ave_age(engine)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())