import abc
import asyncio
import contextlib
import datetime
import enum
import errno
import platform
import select
import sys
import traceback
import uuid
import warnings
import weakref
from collections.abc import Mapping
from types import TracebackType
from typing import (
Any,
Callable,
Generator,
List,
Optional,
Sequence,
Tuple,
Type,
cast,
)
import psycopg2
import psycopg2.extensions
import psycopg2.extras
from .log import logger
from .utils import (
ClosableQueue,
_ContextManager,
create_completed_future,
get_running_loop,
)
TIMEOUT = 60.0
# Windows specific error code, not in errno for some reason, and doesnt map
# to OSError.errno EBADF
WSAENOTSOCK = 10038
[docs]def connect(
dsn: Optional[str] = None,
*,
timeout: float = TIMEOUT,
enable_json: bool = True,
enable_hstore: bool = True,
enable_uuid: bool = True,
echo: bool = False,
**kwargs: Any,
) -> _ContextManager["Connection"]:
"""A factory for connecting to PostgreSQL.
The coroutine accepts all parameters that psycopg2.connect() does
plus optional keyword-only `timeout` parameters.
Returns instantiated Connection object.
"""
connection = Connection(
dsn,
timeout,
bool(echo),
enable_hstore=enable_hstore,
enable_uuid=enable_uuid,
enable_json=enable_json,
**kwargs,
)
return _ContextManager[Connection](connection, disconnect) # type: ignore
async def disconnect(c: "Connection") -> None:
await c.close()
def _is_bad_descriptor_error(os_error: OSError) -> bool:
if platform.system() == "Windows": # pragma: no cover
winerror = int(getattr(os_error, "winerror", 0))
return winerror == WSAENOTSOCK
return os_error.errno == errno.EBADF
class IsolationCompiler(abc.ABC):
__slots__ = ("_isolation_level", "_readonly", "_deferrable")
def __init__(
self, isolation_level: Optional[str], readonly: bool, deferrable: bool
):
self._isolation_level = isolation_level
self._readonly = readonly
self._deferrable = deferrable
@property
def name(self) -> str:
return self._isolation_level or "Unknown"
def savepoint(self, unique_id: str) -> str:
return f"SAVEPOINT {unique_id}"
def release_savepoint(self, unique_id: str) -> str:
return f"RELEASE SAVEPOINT {unique_id}"
def rollback_savepoint(self, unique_id: str) -> str:
return f"ROLLBACK TO SAVEPOINT {unique_id}"
def commit(self) -> str:
return "COMMIT"
def rollback(self) -> str:
return "ROLLBACK"
def begin(self) -> str:
query = "BEGIN"
if self._isolation_level is not None:
query += f" ISOLATION LEVEL {self._isolation_level.upper()}"
if self._readonly:
query += " READ ONLY"
if self._deferrable:
query += " DEFERRABLE"
return query
def __repr__(self) -> str:
return self.name
class ReadCommittedCompiler(IsolationCompiler):
__slots__ = ()
def __init__(self, readonly: bool, deferrable: bool):
super().__init__("Read committed", readonly, deferrable)
class RepeatableReadCompiler(IsolationCompiler):
__slots__ = ()
def __init__(self, readonly: bool, deferrable: bool):
super().__init__("Repeatable read", readonly, deferrable)
class SerializableCompiler(IsolationCompiler):
__slots__ = ()
def __init__(self, readonly: bool, deferrable: bool):
super().__init__("Serializable", readonly, deferrable)
class DefaultCompiler(IsolationCompiler):
__slots__ = ()
def __init__(self, readonly: bool, deferrable: bool):
super().__init__(None, readonly, deferrable)
@property
def name(self) -> str:
return "Default"
class IsolationLevel(enum.Enum):
serializable = SerializableCompiler
repeatable_read = RepeatableReadCompiler
read_committed = ReadCommittedCompiler
default = DefaultCompiler
def __call__(self, readonly: bool, deferrable: bool) -> IsolationCompiler:
return self.value(readonly, deferrable) # type: ignore
async def _release_savepoint(t: "Transaction") -> None:
await t.release_savepoint()
async def _rollback_savepoint(t: "Transaction") -> None:
await t.rollback_savepoint()
class Transaction:
__slots__ = ("_cursor", "_is_begin", "_isolation", "_unique_id")
def __init__(
self,
cursor: "Cursor",
isolation_level: Callable[[bool, bool], IsolationCompiler],
readonly: bool = False,
deferrable: bool = False,
):
self._cursor = cursor
self._is_begin = False
self._unique_id: Optional[str] = None
self._isolation = isolation_level(readonly, deferrable)
@property
def is_begin(self) -> bool:
return self._is_begin
async def begin(self) -> "Transaction":
if self._is_begin:
raise psycopg2.ProgrammingError(
"You are trying to open a new transaction, use the save point"
)
self._is_begin = True
await self._cursor.execute(self._isolation.begin())
return self
async def commit(self) -> None:
self._check_commit_rollback()
await self._cursor.execute(self._isolation.commit())
self._is_begin = False
async def rollback(self) -> None:
self._check_commit_rollback()
if not self._cursor.closed:
await self._cursor.execute(self._isolation.rollback())
self._is_begin = False
async def rollback_savepoint(self) -> None:
self._check_release_rollback()
if not self._cursor.closed:
await self._cursor.execute(
self._isolation.rollback_savepoint(
self._unique_id # type: ignore
)
)
self._unique_id = None
async def release_savepoint(self) -> None:
self._check_release_rollback()
await self._cursor.execute(
self._isolation.release_savepoint(self._unique_id) # type: ignore
)
self._unique_id = None
async def savepoint(self) -> "Transaction":
self._check_commit_rollback()
if self._unique_id is not None:
raise psycopg2.ProgrammingError("You do not shut down savepoint")
self._unique_id = f"s{uuid.uuid1().hex}"
await self._cursor.execute(self._isolation.savepoint(self._unique_id))
return self
def point(self) -> _ContextManager["Transaction"]:
return _ContextManager[Transaction](
self.savepoint(),
_release_savepoint,
_rollback_savepoint,
)
def _check_commit_rollback(self) -> None:
if not self._is_begin:
raise psycopg2.ProgrammingError(
"You are trying to commit " "the transaction does not open"
)
def _check_release_rollback(self) -> None:
self._check_commit_rollback()
if self._unique_id is None:
raise psycopg2.ProgrammingError("You do not start savepoint")
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} "
f"transaction={self._isolation} id={id(self):#x}>"
)
def __del__(self) -> None:
if self._is_begin:
warnings.warn(
f"You have not closed transaction {self!r}", ResourceWarning
)
if self._unique_id is not None:
warnings.warn(
f"You have not closed savepoint {self!r}", ResourceWarning
)
async def __aenter__(self) -> "Transaction":
return await self.begin()
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
if exc_type is not None:
await self.rollback()
else:
await self.commit()
async def _commit_transaction(t: Transaction) -> None:
await t.commit()
async def _rollback_transaction(t: Transaction) -> None:
await t.rollback()
[docs]class Cursor:
def __init__(
self,
conn: "Connection",
impl: Any,
timeout: float,
echo: bool,
isolation_level: Optional[IsolationLevel] = None,
):
self._conn = conn
self._impl = impl
self._timeout = timeout
self._echo = echo
self._transaction = Transaction(
self, isolation_level or IsolationLevel.default
)
@property
def echo(self) -> bool:
"""Return echo mode status."""
return self._echo
@property
def description(self) -> Optional[Sequence[Any]]:
"""This read-only attribute is a sequence of 7-item sequences.
Each of these sequences is a collections.namedtuple containing
information describing one result column:
0. name: the name of the column returned.
1. type_code: the PostgreSQL OID of the column.
2. display_size: the actual length of the column in bytes.
3. internal_size: the size in bytes of the column associated to
this column on the server.
4. precision: total number of significant digits in columns of
type NUMERIC. None for other types.
5. scale: count of decimal digits in the fractional part in
columns of type NUMERIC. None for other types.
6. null_ok: always None as not easy to retrieve from the libpq.
This attribute will be None for operations that do not
return rows or if the cursor has not had an operation invoked
via the execute() method yet.
"""
return self._impl.description # type: ignore
[docs] def close(self) -> None:
"""Close the cursor now."""
if not self.closed:
self._impl.close()
@property
def closed(self) -> bool:
"""Read-only boolean attribute: specifies if the cursor is closed."""
return self._impl.closed # type: ignore
@property
def connection(self) -> "Connection":
"""Read-only attribute returning a reference to the `Connection`."""
return self._conn
@property
def raw(self) -> Any:
"""Underlying psycopg cursor object, readonly"""
return self._impl
@property
def name(self) -> str:
# Not supported
return self._impl.name # type: ignore
@property
def scrollable(self) -> Optional[bool]:
# Not supported
return self._impl.scrollable # type: ignore
@scrollable.setter
def scrollable(self, val: bool) -> None:
# Not supported
self._impl.scrollable = val
@property
def withhold(self) -> bool:
# Not supported
return self._impl.withhold # type: ignore
@withhold.setter
def withhold(self, val: bool) -> None:
# Not supported
self._impl.withhold = val
[docs] async def execute(
self,
operation: str,
parameters: Any = None,
*,
timeout: Optional[float] = None,
) -> None:
"""Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be
bound to variables in the operation. Variables are specified
either with positional %s or named %({name})s placeholders.
"""
if timeout is None:
timeout = self._timeout
waiter = self._conn._create_waiter("cursor.execute")
if self._echo:
logger.info(operation)
logger.info("%r", parameters)
try:
self._impl.execute(operation, parameters)
except BaseException:
self._conn._waiter = None
raise
try:
await self._conn._poll(waiter, timeout)
except asyncio.TimeoutError:
self._impl.close()
raise
async def executemany(self, *args: Any, **kwargs: Any) -> None:
# Not supported
raise psycopg2.ProgrammingError(
"executemany cannot be used in asynchronous mode"
)
[docs] async def callproc(
self,
procname: str,
parameters: Any = None,
*,
timeout: Optional[float] = None,
) -> None:
"""Call a stored database procedure with the given name.
The sequence of parameters must contain one entry for each
argument that the procedure expects. The result of the call is
returned as modified copy of the input sequence. Input
parameters are left untouched, output and input/output
parameters replaced with possibly new values.
"""
if timeout is None:
timeout = self._timeout
waiter = self._conn._create_waiter("cursor.callproc")
if self._echo:
logger.info("CALL %s", procname)
logger.info("%r", parameters)
try:
self._impl.callproc(procname, parameters)
except BaseException:
self._conn._waiter = None
raise
else:
await self._conn._poll(waiter, timeout)
[docs] def begin(self) -> _ContextManager[Transaction]:
return _ContextManager[Transaction](
self._transaction.begin(),
_commit_transaction,
_rollback_transaction,
)
[docs] def begin_nested(self) -> _ContextManager[Transaction]:
if self._transaction.is_begin:
return self._transaction.point()
return _ContextManager[Transaction](
self._transaction.begin(),
_commit_transaction,
_rollback_transaction,
)
[docs] def mogrify(self, operation: str, parameters: Any = None) -> bytes:
"""Return a query string after arguments binding.
The byte string returned is exactly the one that would be sent to
the database running the .execute() method or similar.
"""
ret = self._impl.mogrify(operation, parameters)
assert (
not self._conn.isexecuting()
), "Don't support server side mogrify"
return ret # type: ignore
[docs] async def fetchone(self) -> Any:
"""Fetch the next row of a query result set.
Returns a single tuple, or None when no more data is
available.
"""
ret = self._impl.fetchone()
assert (
not self._conn.isexecuting()
), "Don't support server side cursors yet"
return ret
[docs] async def fetchmany(self, size: Optional[int] = None) -> List[Any]:
"""Fetch the next set of rows of a query result.
Returns a list of tuples. An empty list is returned when no
more rows are available.
The number of rows to fetch per call is specified by the
parameter. If it is not given, the cursor's .arraysize
determines the number of rows to be fetched. The method should
try to fetch as many rows as indicated by the size
parameter. If this is not possible due to the specified number
of rows not being available, fewer rows may be returned.
"""
if size is None:
size = self._impl.arraysize
ret = self._impl.fetchmany(size)
assert (
not self._conn.isexecuting()
), "Don't support server side cursors yet"
return ret # type: ignore
[docs] async def fetchall(self) -> List[Any]:
"""Fetch all (remaining) rows of a query result.
Returns them as a list of tuples. An empty list is returned
if there is no more record to fetch.
"""
ret = self._impl.fetchall()
assert (
not self._conn.isexecuting()
), "Don't support server side cursors yet"
return ret # type: ignore
@property
def arraysize(self) -> int:
"""How many rows will be returned by fetchmany() call.
This read/write attribute specifies the number of rows to
fetch at a time with fetchmany(). It defaults to
1 meaning to fetch a single row at a time.
"""
return self._impl.arraysize # type: ignore
@arraysize.setter
def arraysize(self, val: int) -> None:
"""How many rows will be returned by fetchmany() call.
This read/write attribute specifies the number of rows to
fetch at a time with fetchmany(). It defaults to
1 meaning to fetch a single row at a time.
"""
self._impl.arraysize = val
@property
def itersize(self) -> int:
# Not supported
return self._impl.itersize # type: ignore
@itersize.setter
def itersize(self, val: int) -> None:
# Not supported
self._impl.itersize = val
@property
def rowcount(self) -> int:
"""Returns the number of rows that has been produced of affected.
This read-only attribute specifies the number of rows that the
last :meth:`execute` produced (for Data Query Language
statements like SELECT) or affected (for Data Manipulation
Language statements like UPDATE or INSERT).
The attribute is -1 in case no .execute() has been performed
on the cursor or the row count of the last operation if it
can't be determined by the interface.
"""
return self._impl.rowcount # type: ignore
@property
def rownumber(self) -> int:
"""Row index.
This read-only attribute provides the current 0-based index of the
cursor in the result set or ``None`` if the index cannot be
determined."""
return self._impl.rownumber # type: ignore
@property
def lastrowid(self) -> int:
"""OID of the last inserted row.
This read-only attribute provides the OID of the last row
inserted by the cursor. If the table wasn't created with OID
support or the last operation is not a single record insert,
the attribute is set to None.
"""
return self._impl.lastrowid # type: ignore
@property
def query(self) -> Optional[str]:
"""The last executed query string.
Read-only attribute containing the body of the last query sent
to the backend (including bound arguments) as bytes
string. None if no query has been executed yet.
"""
return self._impl.query # type: ignore
@property
def statusmessage(self) -> str:
"""the message returned by the last command."""
return self._impl.statusmessage # type: ignore
@property
def tzinfo_factory(self) -> datetime.tzinfo:
"""The time zone factory used to handle data types such as
`TIMESTAMP WITH TIME ZONE`.
"""
return self._impl.tzinfo_factory # type: ignore
@tzinfo_factory.setter
def tzinfo_factory(self, val: datetime.tzinfo) -> None:
"""The time zone factory used to handle data types such as
`TIMESTAMP WITH TIME ZONE`.
"""
self._impl.tzinfo_factory = val
async def nextset(self) -> None:
# Not supported
self._impl.nextset() # raises psycopg2.NotSupportedError
[docs] async def setoutputsize(
self, size: int, column: Optional[int] = None
) -> None:
# Does nothing
self._impl.setoutputsize(size, column)
async def copy_from(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"copy_from cannot be used in asynchronous mode"
)
async def copy_to(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"copy_to cannot be used in asynchronous mode"
)
async def copy_expert(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"copy_expert cannot be used in asynchronous mode"
)
@property
def timeout(self) -> float:
"""Return default timeout for cursor operations."""
return self._timeout
def __aiter__(self) -> "Cursor":
return self
async def __anext__(self) -> Any:
ret = await self.fetchone()
if ret is not None:
return ret
raise StopAsyncIteration
async def __aenter__(self) -> "Cursor":
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
self.close()
def __repr__(self) -> str:
return (
f"<"
f"{type(self).__module__}::{type(self).__name__} "
f"name={self.name}, "
f"closed={self.closed}"
f">"
)
async def _close_cursor(c: Cursor) -> None:
c.close()
[docs]class Connection:
"""Low-level asynchronous interface for wrapped psycopg2 connection.
The Connection instance encapsulates a database session.
Provides support for creating asynchronous cursors.
"""
_source_traceback = None
def __init__(
self,
dsn: Optional[str],
timeout: float,
echo: bool = False,
enable_json: bool = True,
enable_hstore: bool = True,
enable_uuid: bool = True,
**kwargs: Any,
):
self._enable_json = enable_json
self._enable_hstore = enable_hstore
self._enable_uuid = enable_uuid
self._loop = get_running_loop()
self._waiter: Optional[
"asyncio.Future[None]"
] = self._loop.create_future()
kwargs["async_"] = kwargs.pop("async", True)
kwargs.pop("loop", None) # backward compatibility
self._conn = psycopg2.connect(dsn, **kwargs)
self._dsn = self._conn.dsn
assert self._conn.isexecuting(), "Is conn an async at all???"
self._fileno: Optional[int] = self._conn.fileno()
self._timeout = timeout
self._last_usage = self._loop.time()
self._writing = False
self._echo = echo
self._notifies = asyncio.Queue() # type: ignore
self._notifies_proxy = ClosableQueue(self._notifies, self._loop)
self._weakref = weakref.ref(self)
self._loop.add_reader(
self._fileno, self._ready, self._weakref # type: ignore
)
if self._loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
@staticmethod
def _ready(weak_self: "weakref.ref[Any]") -> None:
self = cast(Connection, weak_self())
if self is None:
return
waiter = self._waiter
try:
state = self._conn.poll()
while self._conn.notifies:
notify = self._conn.notifies.pop(0)
self._notifies.put_nowait(notify)
except (psycopg2.Warning, psycopg2.Error) as exc:
if self._fileno is not None:
try:
select.select([self._fileno], [], [], 0)
except OSError as os_exc:
if _is_bad_descriptor_error(os_exc):
with contextlib.suppress(OSError):
self._loop.remove_reader(self._fileno)
# forget a bad file descriptor, don't try to
# touch it
self._fileno = None
try:
if self._writing:
self._writing = False
if self._fileno is not None:
self._loop.remove_writer(self._fileno)
except OSError as exc2:
if exc2.errno != errno.EBADF:
# EBADF is ok for closed file descriptor
# chain exception otherwise
exc2.__cause__ = exc
exc = exc2
self._notifies_proxy.close(exc)
if waiter is not None and not waiter.done():
waiter.set_exception(exc)
else:
if self._fileno is None:
# connection closed
if waiter is not None and not waiter.done():
waiter.set_exception(
psycopg2.OperationalError("Connection closed")
)
if state == psycopg2.extensions.POLL_OK:
if self._writing:
self._loop.remove_writer(self._fileno) # type: ignore
self._writing = False
if waiter is not None and not waiter.done():
waiter.set_result(None)
elif state == psycopg2.extensions.POLL_READ:
if self._writing:
self._loop.remove_writer(self._fileno) # type: ignore
self._writing = False
elif state == psycopg2.extensions.POLL_WRITE:
if not self._writing:
self._loop.add_writer(
self._fileno, self._ready, weak_self # type: ignore
)
self._writing = True
elif state == psycopg2.extensions.POLL_ERROR:
self._fatal_error(
"Fatal error on aiopg connection: "
"POLL_ERROR from underlying .poll() call"
)
else:
self._fatal_error(
f"Fatal error on aiopg connection: "
f"unknown answer {state} from underlying "
f".poll() call"
)
def _fatal_error(self, message: str) -> None:
# Should be called from exception handler only.
self._loop.call_exception_handler(
{
"message": message,
"connection": self,
}
)
self.close()
if self._waiter and not self._waiter.done():
self._waiter.set_exception(psycopg2.OperationalError(message))
def _create_waiter(self, func_name: str) -> "asyncio.Future[None]":
if self._waiter is not None:
raise RuntimeError(
f"{func_name}() called while another coroutine "
f"is already waiting for incoming data"
)
self._waiter = self._loop.create_future()
return self._waiter
async def _poll(
self, waiter: "asyncio.Future[None]", timeout: float
) -> None:
assert waiter is self._waiter, (waiter, self._waiter)
self._ready(self._weakref)
try:
await asyncio.wait_for(self._waiter, timeout)
except (asyncio.CancelledError, asyncio.TimeoutError) as exc:
await asyncio.shield(self.close())
raise exc
except psycopg2.extensions.QueryCanceledError as exc:
self._loop.call_exception_handler(
{
"message": exc.pgerror,
"exception": exc,
"future": self._waiter,
}
)
raise asyncio.CancelledError
finally:
self._waiter = None
def isexecuting(self) -> bool:
return self._conn.isexecuting() # type: ignore
[docs] def cursor(
self,
name: Optional[str] = None,
cursor_factory: Any = None,
scrollable: Optional[bool] = None,
withhold: bool = False,
timeout: Optional[float] = None,
isolation_level: Optional[IsolationLevel] = None,
) -> _ContextManager[Cursor]:
"""A coroutine that returns a new cursor object using the connection.
*cursor_factory* argument can be used to create non-standard
cursors. The argument must be subclass of
`psycopg2.extensions.cursor`.
*name*, *scrollable* and *withhold* parameters are not supported by
psycopg in asynchronous mode.
"""
self._last_usage = self._loop.time()
coro = self._cursor(
name=name,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold,
timeout=timeout,
isolation_level=isolation_level,
)
return _ContextManager[Cursor](coro, _close_cursor)
async def _cursor(
self,
name: Optional[str] = None,
cursor_factory: Any = None,
scrollable: Optional[bool] = None,
withhold: bool = False,
timeout: Optional[float] = None,
isolation_level: Optional[IsolationLevel] = None,
) -> Cursor:
if timeout is None:
timeout = self._timeout
impl = await self._cursor_impl(
name=name,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold,
)
cursor = Cursor(self, impl, timeout, self._echo, isolation_level)
return cursor
async def _cursor_impl(
self,
name: Optional[str] = None,
cursor_factory: Any = None,
scrollable: Optional[bool] = None,
withhold: bool = False,
) -> Any:
if cursor_factory is None:
impl = self._conn.cursor(
name=name, scrollable=scrollable, withhold=withhold
)
else:
impl = self._conn.cursor(
name=name,
cursor_factory=cursor_factory,
scrollable=scrollable,
withhold=withhold,
)
return impl
def _close(self) -> None:
"""Remove the connection from the event_loop and close it."""
# N.B. If connection contains uncommitted transaction the
# transaction will be discarded
if self._fileno is not None:
self._loop.remove_reader(self._fileno)
if self._writing:
self._writing = False
self._loop.remove_writer(self._fileno)
self._conn.close()
if not self._loop.is_closed():
if self._waiter is not None and not self._waiter.done():
self._waiter.set_exception(
psycopg2.OperationalError("Connection closed")
)
self._notifies_proxy.close(
psycopg2.OperationalError("Connection closed")
)
[docs] def close(self) -> "asyncio.Future[None]":
self._close()
return create_completed_future(self._loop)
@property
def closed(self) -> bool:
"""Connection status.
Read-only attribute reporting whether the database connection is
open (False) or closed (True).
"""
return self._conn.closed # type: ignore
@property
def raw(self) -> Any:
"""Underlying psycopg connection object, readonly"""
return self._conn
async def commit(self) -> None:
raise psycopg2.ProgrammingError(
"commit cannot be used in asynchronous mode"
)
async def rollback(self) -> None:
raise psycopg2.ProgrammingError(
"rollback cannot be used in asynchronous mode"
)
# TPC
async def xid(
self, format_id: int, gtrid: str, bqual: str
) -> Tuple[int, str, str]:
return self._conn.xid(format_id, gtrid, bqual) # type: ignore
async def tpc_begin(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"tpc_begin cannot be used in asynchronous mode"
)
async def tpc_prepare(self) -> None:
raise psycopg2.ProgrammingError(
"tpc_prepare cannot be used in asynchronous mode"
)
async def tpc_commit(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"tpc_commit cannot be used in asynchronous mode"
)
async def tpc_rollback(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"tpc_rollback cannot be used in asynchronous mode"
)
async def tpc_recover(self) -> None:
raise psycopg2.ProgrammingError(
"tpc_recover cannot be used in asynchronous mode"
)
[docs] async def cancel(self) -> None:
raise psycopg2.ProgrammingError(
"cancel cannot be used in asynchronous mode"
)
async def reset(self) -> None:
raise psycopg2.ProgrammingError(
"reset cannot be used in asynchronous mode"
)
@property
def dsn(self) -> Optional[str]:
"""DSN connection string.
Read-only attribute representing dsn connection string used
for connectint to PostgreSQL server.
"""
return self._dsn # type: ignore
async def set_session(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"set_session cannot be used in asynchronous mode"
)
@property
def autocommit(self) -> bool:
"""Autocommit status"""
return self._conn.autocommit # type: ignore
@autocommit.setter
def autocommit(self, val: bool) -> None:
"""Autocommit status"""
self._conn.autocommit = val
@property
def isolation_level(self) -> int:
"""Transaction isolation level.
The only allowed value is ISOLATION_LEVEL_READ_COMMITTED.
"""
return self._conn.isolation_level # type: ignore
async def set_isolation_level(self, val: int) -> None:
"""Transaction isolation level.
The only allowed value is ISOLATION_LEVEL_READ_COMMITTED.
"""
self._conn.set_isolation_level(val)
@property
def encoding(self) -> str:
"""Client encoding for SQL operations."""
return self._conn.encoding # type: ignore
async def set_client_encoding(self, val: str) -> None:
self._conn.set_client_encoding(val)
@property
def notices(self) -> List[str]:
"""A list of all db messages sent to the client during the session."""
return self._conn.notices # type: ignore
@property
def cursor_factory(self) -> Any:
"""The default cursor factory used by .cursor()."""
return self._conn.cursor_factory
[docs] async def get_backend_pid(self) -> int:
"""Returns the PID of the backend server process."""
return self._conn.get_backend_pid() # type: ignore
[docs] async def get_parameter_status(self, parameter: str) -> Optional[str]:
"""Look up a current parameter setting of the server."""
return self._conn.get_parameter_status(parameter) # type: ignore
[docs] async def get_transaction_status(self) -> int:
"""Return the current session transaction status as an integer."""
return self._conn.get_transaction_status() # type: ignore
@property
def protocol_version(self) -> int:
"""A read-only integer representing protocol being used."""
return self._conn.protocol_version # type: ignore
@property
def server_version(self) -> int:
"""A read-only integer representing the backend version."""
return self._conn.server_version # type: ignore
@property
def status(self) -> int:
"""A read-only integer representing the status of the connection."""
return self._conn.status # type: ignore
async def lobject(self, *args: Any, **kwargs: Any) -> None:
raise psycopg2.ProgrammingError(
"lobject cannot be used in asynchronous mode"
)
@property
def timeout(self) -> float:
"""Return default timeout for connection operations."""
return self._timeout
@property
def last_usage(self) -> float:
"""Return time() when connection was used."""
return self._last_usage
@property
def echo(self) -> bool:
"""Return echo mode status."""
return self._echo
def __repr__(self) -> str:
return (
f"<"
f"{type(self).__module__}::{type(self).__name__} "
f"isexecuting={self.isexecuting()}, "
f"closed={self.closed}, "
f"echo={self.echo}, "
f">"
)
def __del__(self) -> None:
try:
_conn = self._conn
except AttributeError:
return
if _conn is not None and not _conn.closed:
self.close()
warnings.warn(f"Unclosed connection {self!r}", ResourceWarning)
context = {"connection": self, "message": "Unclosed connection"}
if self._source_traceback is not None:
context["source_traceback"] = self._source_traceback
self._loop.call_exception_handler(context)
@property
def notifies(self) -> ClosableQueue:
"""Return notification queue (an asyncio.Queue -like object)."""
return self._notifies_proxy
async def _get_oids(self) -> Tuple[Any, Any]:
cursor = await self.cursor()
rv0, rv1 = [], []
try:
await cursor.execute(
"SELECT t.oid, typarray "
"FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid "
"WHERE typname = 'hstore';"
)
async for oids in cursor:
if isinstance(oids, Mapping):
rv0.append(oids["oid"])
rv1.append(oids["typarray"])
else:
rv0.append(oids[0])
rv1.append(oids[1])
finally:
cursor.close()
return tuple(rv0), tuple(rv1)
async def _connect(self) -> "Connection":
try:
await self._poll(self._waiter, self._timeout) # type: ignore
except BaseException:
await asyncio.shield(self.close())
raise
if self._enable_json:
psycopg2.extras.register_default_json(self._conn)
if self._enable_uuid:
psycopg2.extras.register_uuid(conn_or_curs=self._conn)
if self._enable_hstore:
oid, array_oid = await self._get_oids()
psycopg2.extras.register_hstore(
self._conn, oid=oid, array_oid=array_oid
)
return self
def __await__(self) -> Generator[Any, None, "Connection"]:
return self._connect().__await__()
async def __aenter__(self) -> "Connection":
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
await self.close()