Исходный код autodealer.queryset

"""Django-подобный QuerySet и Manager для моделей SQLAlchemy 2.0.

Каждая модель наследует от :class:`~autodealer.connection.Base`, к которому
автоматически привязан :class:`Manager`. Доступ через атрибут ``objects``::

    from autodealer.domain.bank import Bank

    Bank.objects.filter(hidden=0).order_by('name').all()
"""

from __future__ import annotations

from typing import Any, Generic, TypeVar, overload

from sqlalchemy import (
    delete,
    func,
    select,
    type_coerce,
    update,
)
from sqlalchemy.types import NullType
from sqlalchemy.orm import sessionmaker


def _session_scope():
    from autodealer.connection import session_scope

    return session_scope()


def _get_engine():
    from autodealer.connection import get_engine

    return get_engine()


T = TypeVar("T")


[документация] class DoesNotExist(Exception): """Объект не найден. Выбрасывается из :meth:`QuerySet.get`."""
[документация] class MultipleObjectsReturned(Exception): """Найдено более одного объекта. Выбрасывается из :meth:`QuerySet.get`."""
[документация] class QuerySet(Generic[T]): """Цепочечный построитель запросов, аналогичный Django QuerySet. Не выполняет SQL до вызова терминального метода (:meth:`all`, :meth:`first`, :meth:`last`, :meth:`count`, :meth:`get`, :meth:`exists`, :meth:`values`). Не создавать напрямую — использовать ``Model.objects``. Example:: # Все активные банки, отсортированные по имени banks = Bank.objects.filter(hidden=0).order_by('name').all() # Один объект или исключение bank = Bank.objects.get(bank_id=1) # Проверка существования if Bank.objects.filter(name__icontains='сбер').exists(): ... """ def __init__(self, model: type[T]) -> None: self._model = model self._wheres: list = [] self._order_cols: list = [] self._limit_val: int | None = None self._offset_val: int | None = None # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _clone(self) -> "QuerySet[T]": qs: QuerySet[T] = QuerySet(self._model) qs._wheres = self._wheres.copy() qs._order_cols = self._order_cols.copy() qs._limit_val = self._limit_val qs._offset_val = self._offset_val return qs def _resolve_field(self, key: str): """Support double-underscore lookups: name__contains, age__gte, etc.""" # literal() is required for LIKE patterns to avoid a sqlalchemy-firebird # bug where the Firebird type compiler receives the column length as `name` # instead of the type name, causing TypeError during SQL compilation. lookups = { "exact": lambda col, v: col == v, # type_coerce(..., NullType()) prevents sqlalchemy-firebird from # calling visit_VARCHAR on the bind parameter, which has a bug # where _render_string_type receives swapped positional arguments. "iexact": lambda col, v: col.ilike(type_coerce(v, NullType())), "contains": lambda col, v: col.like(type_coerce(f"%{v}%", NullType())), "icontains": lambda col, v: col.ilike(type_coerce(f"%{v}%", NullType())), "startswith": lambda col, v: col.like(type_coerce(f"{v}%", NullType())), "endswith": lambda col, v: col.like(type_coerce(f"%{v}", NullType())), "gt": lambda col, v: col > v, "gte": lambda col, v: col >= v, "lt": lambda col, v: col < v, "lte": lambda col, v: col <= v, "in": lambda col, v: col.in_(v), "isnull": lambda col, v: col.is_(None) if v else col.isnot(None), } parts = key.rsplit("__", 1) if len(parts) == 2 and parts[1] in lookups: field_name, lookup = parts col = getattr(self._model, field_name) return lookups[lookup], col col = getattr(self._model, key) return lookups["exact"], col @staticmethod def _coerce_strings(kwargs: dict) -> dict: """Wrap str values with type_coerce(..., NullType()) to avoid the sqlalchemy-firebird bug where _render_string_type receives swapped arguments during INSERT/UPDATE compilation.""" return { k: type_coerce(v, NullType()) if isinstance(v, str) else v for k, v in kwargs.items() } def _build_stmt(self): stmt = select(self._model) for w in self._wheres: stmt = stmt.where(w) if self._order_cols: stmt = stmt.order_by(*self._order_cols) if self._limit_val is not None: stmt = stmt.limit(self._limit_val) if self._offset_val is not None: stmt = stmt.offset(self._offset_val) return stmt # ------------------------------------------------------------------ # Filtering / ordering # ------------------------------------------------------------------
[документация] def filter(self, **kwargs: Any) -> "QuerySet[T]": """Добавить условия фильтрации (WHERE). Поддерживаются лукапы через ``__``: ``exact``, ``iexact``, ``contains``, ``icontains``, ``startswith``, ``endswith``, ``gt``, ``gte``, ``lt``, ``lte``, ``in``, ``isnull``. Args: **kwargs: Условия вида ``field=value`` или ``field__lookup=value``. Returns: Новый :class:`QuerySet` с добавленными условиями. Example:: Bank.objects.filter(hidden=0, name__icontains='сбер') """ qs = self._clone() for key, value in kwargs.items(): op, col = self._resolve_field(key) qs._wheres.append(op(col, value)) return qs
[документация] def exclude(self, **kwargs: Any) -> "QuerySet[T]": """Исключить записи, соответствующие условиям (WHERE NOT). Args: **kwargs: Условия вида ``field=value`` или ``field__lookup=value``. Returns: Новый :class:`QuerySet`. """ qs = self._clone() for key, value in kwargs.items(): op, col = self._resolve_field(key) qs._wheres.append(~op(col, value)) return qs
[документация] def order_by(self, *fields: str) -> "QuerySet[T]": """Задать сортировку. Префикс ``-`` означает сортировку по убыванию. Args: *fields: Имена полей. ``'-name'`` — DESC, ``'name'`` — ASC. Returns: Новый :class:`QuerySet`. Example:: Bank.objects.order_by('-bank_id', 'name') """ qs = self._clone() for field in fields: if field.startswith("-"): col = getattr(self._model, field[1:]) qs._order_cols.append(col.desc()) else: col = getattr(self._model, field) qs._order_cols.append(col.asc()) return qs
[документация] def limit(self, n: int) -> "QuerySet[T]": """Ограничить количество результатов (LIMIT). Args: n: Максимальное число записей. Returns: Новый :class:`QuerySet`. """ qs = self._clone() qs._limit_val = n return qs
[документация] def offset(self, n: int) -> "QuerySet[T]": """Пропустить первые *n* записей (OFFSET). Args: n: Число пропускаемых записей. Returns: Новый :class:`QuerySet`. """ qs = self._clone() qs._offset_val = n return qs
# ------------------------------------------------------------------ # Evaluation # ------------------------------------------------------------------
[документация] def all(self) -> list[T]: """Выполнить запрос и вернуть все результаты. Returns: Список экземпляров модели (detached от сессии). """ with _session_scope() as session: result = session.execute(self._build_stmt()).scalars().all() session.expunge_all() return result
[документация] def first(self) -> T | None: """Вернуть первую запись или ``None``. Returns: Экземпляр модели или ``None``. """ with _session_scope() as session: stmt = self._build_stmt().limit(1) obj = session.execute(stmt).scalars().first() if obj is not None: session.expunge(obj) return obj
[документация] def last(self) -> T | None: """Вернуть последнюю запись или ``None``. Returns: Экземпляр модели или ``None``. """ results = self.all() return results[-1] if results else None
[документация] def get(self, **kwargs: Any) -> T: """Вернуть ровно одну запись. Args: **kwargs: Дополнительные условия (как в :meth:`filter`). Returns: Экземпляр модели. Raises: DoesNotExist: Запись не найдена. MultipleObjectsReturned: Найдено более одной записи. Example:: bank = Bank.objects.get(bank_id=1) """ qs = self.filter(**kwargs) if kwargs else self with _session_scope() as session: results = session.execute(qs._build_stmt()).scalars().all() if len(results) == 0: raise DoesNotExist( f"{self._model.__name__} matching query does not exist." ) if len(results) > 1: raise MultipleObjectsReturned( f"get() returned more than one {self._model.__name__}." ) session.expunge(results[0]) return results[0]
[документация] def count(self) -> int: """Вернуть количество записей (SELECT COUNT(*)). Returns: Целое число. """ stmt = select(func.count()).select_from(self._model) for w in self._wheres: stmt = stmt.where(w) with _session_scope() as session: return session.execute(stmt).scalar() or 0
[документация] def exists(self) -> bool: """Проверить, есть ли хотя бы одна запись. Returns: ``True`` если записи существуют. """ return self.count() > 0
[документация] def values(self, *fields: str) -> list[dict]: """Вернуть список словарей вместо экземпляров модели. Args: *fields: Имена полей. Если не переданы — все колонки. Returns: Список ``dict``. Example:: Bank.objects.filter(hidden=0).values('bank_id', 'name') # [{'bank_id': 1, 'name': 'Сбербанк'}, ...] """ cols = fields or [c.key for c in self._model.__table__.columns] with _session_scope() as session: objs = session.execute(self._build_stmt()).scalars().all() return [{f: getattr(obj, f) for f in cols} for obj in objs]
# ------------------------------------------------------------------ # Write operations # ------------------------------------------------------------------
[документация] def create(self, **kwargs: Any) -> T: """Создать и сохранить новую запись. Args: **kwargs: Значения полей новой записи. Returns: Сохранённый экземпляр модели (атрибуты доступны после коммита). Example:: bank = Bank.objects.create(name='Тинькофф', bik='044525974') """ from sqlalchemy import insert as _insert Session = sessionmaker(bind=_get_engine(), expire_on_commit=False) session = Session() try: stmt = _insert(self._model).values(**self._coerce_strings(kwargs)) result = session.execute(stmt) session.commit() # Build detached instance; populate trigger-generated PK from RETURNING obj = self._model(**kwargs) pk_cols = list(self._model.__mapper__.primary_key) for col, val in zip(pk_cols, result.inserted_primary_key or []): if getattr(obj, col.key, None) is None and val is not None: setattr(obj, col.key, val) return obj except Exception: session.rollback() raise finally: session.close()
[документация] def update(self, **kwargs: Any) -> int: """Массово обновить записи, соответствующие фильтрам. Args: **kwargs: Поля и новые значения. Returns: Количество изменённых строк. Example:: Bank.objects.filter(hidden=1).update(hidden=0) """ stmt = update(self._model) for w in self._wheres: stmt = stmt.where(w) stmt = stmt.values(**self._coerce_strings(kwargs)) with _session_scope() as session: result = session.execute(stmt) return result.rowcount
[документация] def delete(self) -> int: """Массово удалить записи, соответствующие фильтрам. Returns: Количество удалённых строк. Example:: Bank.objects.filter(hidden=1).delete() """ stmt = delete(self._model) for w in self._wheres: stmt = stmt.where(w) with _session_scope() as session: result = session.execute(stmt) return result.rowcount
# ------------------------------------------------------------------ # Python protocol # ------------------------------------------------------------------ def __iter__(self): return iter(self.all()) def __len__(self) -> int: return self.count() def __repr__(self) -> str: return f"<QuerySet model={self._model.__name__}>"
[документация] class Manager: """Descriptor that provides Django-like `Model.objects` access. Attach to a class with:: class MyModel(Base): objects = Manager() """ def __set_name__(self, owner: type, name: str) -> None: self._name = name @overload def __get__(self, obj: None, objtype: type[T]) -> QuerySet[T]: ... @overload def __get__(self, obj: object, objtype: type[T] | None) -> QuerySet[T]: ... def __get__(self, obj: object, objtype: type | None = None) -> QuerySet: if obj is not None: raise AttributeError("Manager is not accessible via model instances.") return QuerySet(objtype)