import asyncio
import collections
import warnings
from types import TracebackType
from typing import (
Any,
Awaitable,
Callable,
Deque,
Generator,
Optional,
Set,
Type,
)
import async_timeout
import psycopg2.extensions
from .connection import TIMEOUT, Connection, Cursor, connect
from .utils import _ContextManager, create_completed_future, get_running_loop
[docs]def create_pool(
dsn: Optional[str] = None,
*,
minsize: int = 1,
maxsize: int = 10,
timeout: float = TIMEOUT,
pool_recycle: float = -1.0,
enable_json: bool = True,
enable_hstore: bool = True,
enable_uuid: bool = True,
echo: bool = False,
on_connect: Optional[Callable[[Connection], Awaitable[None]]] = None,
**kwargs: Any,
) -> _ContextManager["Pool"]:
coro = Pool.from_pool_fill(
dsn,
minsize,
maxsize,
timeout,
enable_json=enable_json,
enable_hstore=enable_hstore,
enable_uuid=enable_uuid,
echo=echo,
on_connect=on_connect,
pool_recycle=pool_recycle,
**kwargs,
)
return _ContextManager[Pool](coro, _destroy_pool)
async def _destroy_pool(pool: "Pool") -> None:
pool.close()
await pool.wait_closed()
class _PoolConnectionContextManager:
"""Context manager.
This enables the following idiom for acquiring and releasing a
connection around a block:
async with pool as conn:
cur = await conn.cursor()
while failing loudly when accidentally using:
with pool:
<block>
"""
__slots__ = ("_pool", "_conn")
def __init__(self, pool: "Pool", conn: Connection):
self._pool: Optional[Pool] = pool
self._conn: Optional[Connection] = conn
def __enter__(self) -> Connection:
assert self._conn
return self._conn
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
if self._pool is None or self._conn is None:
return
try:
self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None
async def __aenter__(self) -> Connection:
assert self._conn
return self._conn
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
if self._pool is None or self._conn is None:
return
try:
await self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None
class _PoolCursorContextManager:
"""Context manager.
This enables the following idiom for acquiring and releasing a
cursor around a block:
async with pool.cursor() as cur:
await cur.execute("SELECT 1")
while failing loudly when accidentally using:
with pool:
<block>
"""
__slots__ = ("_pool", "_conn", "_cursor")
def __init__(self, pool: "Pool", conn: Connection, cursor: Cursor):
self._pool = pool
self._conn = conn
self._cursor = cursor
def __enter__(self) -> Cursor:
return self._cursor
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
try:
self._cursor.close()
except psycopg2.ProgrammingError:
# seen instances where the cursor fails to close:
# https://github.com/aio-libs/aiopg/issues/364
# We close it here so we don't return a bad connection to the pool
self._conn.close()
raise
finally:
try:
self._pool.release(self._conn)
finally:
self._pool = None # type: ignore
self._conn = None # type: ignore
self._cursor = None # type: ignore
[docs]class Pool:
"""Connection pool"""
def __init__(
self,
dsn: str,
minsize: int,
maxsize: int,
timeout: float,
*,
enable_json: bool,
enable_hstore: bool,
enable_uuid: bool,
echo: bool,
on_connect: Optional[Callable[[Connection], Awaitable[None]]],
pool_recycle: float,
**kwargs: Any,
):
if minsize < 0:
raise ValueError("minsize should be zero or greater")
if maxsize < minsize and maxsize != 0:
raise ValueError("maxsize should be not less than minsize")
self._dsn = dsn
self._minsize = minsize
self._loop = get_running_loop()
self._timeout = timeout
self._recycle = pool_recycle
self._enable_json = enable_json
self._enable_hstore = enable_hstore
self._enable_uuid = enable_uuid
self._echo = echo
self._on_connect = on_connect
self._conn_kwargs = kwargs
self._acquiring = 0
self._free: Deque[Connection] = collections.deque(
maxlen=maxsize or None
)
self._cond = asyncio.Condition()
self._used: Set[Connection] = set()
self._terminated: Set[Connection] = set()
self._closing = False
self._closed = False
@property
def echo(self) -> bool:
return self._echo
@property
def minsize(self) -> int:
return self._minsize
@property
def maxsize(self) -> Optional[int]:
return self._free.maxlen
@property
def size(self) -> int:
return self.freesize + len(self._used) + self._acquiring
@property
def freesize(self) -> int:
return len(self._free)
@property
def timeout(self) -> float:
return self._timeout
[docs] async def clear(self) -> None:
"""Close all free connections in pool."""
async with self._cond:
while self._free:
conn = self._free.popleft()
await conn.close()
self._cond.notify()
@property
def closed(self) -> bool:
return self._closed
[docs] def close(self) -> None:
"""Close pool.
Mark all pool connections to be closed on getting back to pool.
Closed pool doesn't allow to acquire new connections.
"""
if self._closed:
return
self._closing = True
[docs] def terminate(self) -> None:
"""Terminate pool.
Close pool with instantly closing all acquired connections also.
"""
self.close()
for conn in list(self._used):
conn.close()
self._terminated.add(conn)
self._used.clear()
[docs] async def wait_closed(self) -> None:
"""Wait for closing all pool's connections."""
if self._closed:
return
if not self._closing:
raise RuntimeError(
".wait_closed() should be called " "after .close()"
)
while self._free:
conn = self._free.popleft()
await conn.close()
async with self._cond:
while self.size > self.freesize:
await self._cond.wait()
self._closed = True
[docs] def acquire(self) -> _ContextManager[Connection]:
"""Acquire free connection from the pool."""
coro = self._acquire()
return _ContextManager[Connection](coro, self.release)
[docs] @classmethod
async def from_pool_fill(cls, *args: Any, **kwargs: Any) -> "Pool":
"""constructor for filling the free pool with connections,
the number is controlled by the minsize parameter
"""
self = cls(*args, **kwargs)
if self._minsize > 0:
async with self._cond:
await self._fill_free_pool(False)
return self
async def _acquire(self) -> Connection:
if self._closing:
raise RuntimeError("Cannot acquire connection after closing pool")
async with async_timeout.timeout(self._timeout), self._cond:
while True:
await self._fill_free_pool(True)
if self._free:
conn = self._free.popleft()
assert not conn.closed, conn
assert conn not in self._used, (conn, self._used)
self._used.add(conn)
return conn
else:
await self._cond.wait()
async def _fill_free_pool(self, override_min: bool) -> None:
# iterate over free connections and remove timeouted ones
n, free = 0, len(self._free)
while n < free:
conn = self._free[-1]
if conn.closed:
self._free.pop()
elif -1 < self._recycle < self._loop.time() - conn.last_usage:
await conn.close()
self._free.pop()
else:
self._free.rotate()
n += 1
while self.size < self.minsize:
self._acquiring += 1
try:
conn = await connect(
self._dsn,
timeout=self._timeout,
enable_json=self._enable_json,
enable_hstore=self._enable_hstore,
enable_uuid=self._enable_uuid,
echo=self._echo,
**self._conn_kwargs,
)
if self._on_connect is not None:
await self._on_connect(conn)
# raise exception if pool is closing
self._free.append(conn)
self._cond.notify()
finally:
self._acquiring -= 1
if self._free:
return
if override_min and (not self.maxsize or self.size < self.maxsize):
self._acquiring += 1
try:
conn = await connect(
self._dsn,
timeout=self._timeout,
enable_json=self._enable_json,
enable_hstore=self._enable_hstore,
enable_uuid=self._enable_uuid,
echo=self._echo,
**self._conn_kwargs,
)
if self._on_connect is not None:
await self._on_connect(conn)
# raise exception if pool is closing
self._free.append(conn)
self._cond.notify()
finally:
self._acquiring -= 1
async def _wakeup(self) -> None:
async with self._cond:
self._cond.notify()
[docs] def release(self, conn: Connection) -> "asyncio.Future[None]":
"""Release free connection back to the connection pool."""
future = create_completed_future(self._loop)
if conn in self._terminated:
assert conn.closed, conn
self._terminated.remove(conn)
return future
assert conn in self._used, (conn, self._used)
self._used.remove(conn)
if conn.closed:
return future
transaction_status = conn.raw.get_transaction_status()
if transaction_status != psycopg2.extensions.TRANSACTION_STATUS_IDLE:
warnings.warn(
f"Invalid transaction status on "
f"released connection: {transaction_status}",
ResourceWarning,
)
conn.close()
return future
if self._closing:
conn.close()
else:
self._free.append(conn)
return asyncio.ensure_future(self._wakeup(), loop=self._loop)
[docs] async def cursor(
self,
name: Optional[str] = None,
cursor_factory: Any = None,
scrollable: Optional[bool] = None,
withhold: bool = False,
*,
timeout: Optional[float] = None,
) -> _PoolCursorContextManager:
conn = await self.acquire()
cursor = await conn.cursor(
name=name,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold,
timeout=timeout,
)
return _PoolCursorContextManager(self, conn, cursor)
def __await__(self) -> Generator[Any, Any, _PoolConnectionContextManager]:
# This is not a coroutine. It is meant to enable the idiom:
#
# with (await pool) as conn:
# <block>
#
# as an alternative to:
#
# conn = await pool.acquire()
# try:
# <block>
# finally:
# conn.release()
conn = yield from self._acquire().__await__()
return _PoolConnectionContextManager(self, conn)
def __enter__(self) -> "Pool":
raise RuntimeError(
'"await" should be used as context manager expression'
)
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
# This must exist because __enter__ exists, even though that
# always raises; that's how the with-statement works.
pass # pragma: nocover
async def __aenter__(self) -> "Pool":
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
self.close()
await self.wait_closed()
def __del__(self) -> None:
try:
self._free
except AttributeError:
return # frame has been cleared, __dict__ is empty
if self._free:
left = 0
while self._free:
conn = self._free.popleft()
conn.close()
left += 1
warnings.warn(
f"Unclosed {left} connections in {self!r}", ResourceWarning
)