Skip to content

QueryBuilder

QueryBuilder

QueryBuilder(model: type[T])

Bases: _Filterable, Generic[T]

Chainable query builder. Subclass to add project-specific methods.

Default chain shape::

Pet.query.eq("species", "cat").gte("age", 3).limit(10).all()

Subclass example — add a paginate(page, size) shortcut and bind it via query_class= so every Model.query returns the custom builder::

class PaginatedQB(QueryBuilder):
    def paginate(self, page: int, size: int = 50) -> "PaginatedQB":
        return self.range(page * size, (page + 1) * size - 1)

class Pet(SupabaseModel, table="pets", query_class=PaginatedQB):
    id: UUID
    name: str

page_two = await Pet.query.eq("adopted", False).paginate(2).all()
Source code in src/supabase_orm/_async/_query.py
def __init__(self, model: type[T]) -> None:
    self._model = model
    self._select: str = model.__select__
    # When set by ``as_(plain BaseModel)``, overrides row validation —
    # source still owns table/predicates/select/iter PK.
    self._validator: TypeAdapter | None = None
    # Chain calls only append here. Terminals resolve ``get_client()``
    # fresh and replay this log — so chains built before a
    # ``use_client()`` block still see the current binding at execute time.
    self._ops: list[tuple] = []

match

match(query: dict[str, Any]) -> Self

Filter rows where every column == value pair in query holds.

PostgREST's match is multi-column by design — it has no single column argument. Use it for compound equality filters::

await Pet.query.match({"species": "cat", "adopted": False}).all()

Equivalent to chaining .eq() per pair. Not available inside or_() / not_() (no predicate-string form).

Source code in src/supabase_orm/_async/_query.py
def match(self, query: dict[str, Any]) -> Self:
    """Filter rows where every ``column == value`` pair in ``query`` holds.

    PostgREST's ``match`` is multi-column by design — it has no single
    ``column`` argument. Use it for compound equality filters::

        await Pet.query.match({"species": "cat", "adopted": False}).all()

    Equivalent to chaining ``.eq()`` per pair. Not available inside
    ``or_()`` / ``not_()`` (no predicate-string form).
    """
    for col in query:
        self._model._validate_column(col)
    serialized = {k: serialize(v) for k, v in query.items()}
    self._ops.append((_Op.MATCH, serialized))
    return self

order_by

order_by(*columns: 'str | Column | Order') -> Self

Order results by one or more columns.

Three accepted forms — mix freely::

Pet.query.order_by("-created_at", "name")           # string shorthand
Pet.query.order_by(Pet.f.created_at.desc())         # typed
Pet.query.order_by(Pet.f.last_login.desc(nulls="last"))

Strings use the Django "-col" prefix for descending. The typed form unlocks nulls="first" / "last" ordering, which strings don't expose.

Source code in src/supabase_orm/_async/_query.py
def order_by(self, *columns: "str | Column | Order") -> Self:
    """Order results by one or more columns.

    Three accepted forms — mix freely::

        Pet.query.order_by("-created_at", "name")           # string shorthand
        Pet.query.order_by(Pet.f.created_at.desc())         # typed
        Pet.query.order_by(Pet.f.last_login.desc(nulls="last"))

    Strings use the Django ``"-col"`` prefix for descending. The typed
    form unlocks ``nulls="first"`` / ``"last"`` ordering, which strings
    don't expose.
    """
    for c in columns:
        order = _coerce_order(c)
        self._model._validate_column(order.column)
        self._ops.append((_Op.ORDER, order))
    return self

as_

as_(target: type[U]) -> 'QueryBuilder[U]'

Rebind the response shape.

Two modes:

  • Same-table SupabaseModel — narrows the wire select to the target's __select__ and validates against it.
  • Plain BaseModel — validation only, wire select unchanged. Use when filtering on source columns the lean target doesn't expose.

Cross-table SupabaseModel targets raise.

Parameters:

Name Type Description Default
target type[U]

A Pydantic BaseModel subclass; a same-table :class:SupabaseModel to narrow projection, or any plain BaseModel for validation-only rebinding.

required
Source code in src/supabase_orm/_async/_query.py
def as_(self, target: type[U]) -> "QueryBuilder[U]":
    """Rebind the response shape.

    Two modes:

    - **Same-table SupabaseModel** — narrows the wire ``select`` to the
      target's ``__select__`` and validates against it.
    - **Plain BaseModel** — validation only, wire ``select`` unchanged.
      Use when filtering on source columns the lean target doesn't expose.

    Cross-table SupabaseModel targets raise.

    Args:
        target: A Pydantic ``BaseModel`` subclass; a same-table
            :class:`SupabaseModel` to narrow projection, or any plain
            ``BaseModel`` for validation-only rebinding.
    """
    # Late import — _base imports from _query, so we can't import at
    # module load.
    from ._base import SupabaseModel

    if not (isinstance(target, type) and issubclass(target, BaseModel)):
        raise SupabaseORMUsageError(
            f"as_({target!r}) requires a Pydantic BaseModel subclass."
        )

    if issubclass(target, SupabaseModel):
        if target.__table__ != self._model.__table__:
            raise SupabaseORMUsageError(
                f"as_({target.__name__}): different table "
                f"({target.__table__!r} != {self._model.__table__!r}). "
                "Either point both SupabaseModels at the same __table__, "
                "or pass a plain BaseModel for validation-only rebinding."
            )
        self._model = cast("type[T]", target)
        self._select = target.__select__
        self._validator = None
        return cast("QueryBuilder[U]", self)

    self._validator = _adapter_for(target)
    return cast("QueryBuilder[U]", self)

values async

values(*columns: str) -> list[dict[str, Any]]

Run the query with an ad-hoc column projection. Returns raw dicts.

No Pydantic validation, no autocomplete — caller deals with row["col"]. Use for exports or ad-hoc admin queries.

Parameters:

Name Type Description Default
*columns str

One or more column names. May include PostgREST embed syntax (e.g. "pets(id,name)").

()
Source code in src/supabase_orm/_async/_query.py
async def values(self, *columns: str) -> list[dict[str, Any]]:
    """Run the query with an ad-hoc column projection. Returns raw dicts.

    No Pydantic validation, no autocomplete — caller deals with
    ``row["col"]``. Use for exports or ad-hoc admin queries.

    Args:
        *columns: One or more column names. May include PostgREST embed
            syntax (e.g. ``"pets(id,name)"``).
    """
    if not columns:
        raise SupabaseORMUsageError(".values() requires at least one column.")
    b = self._make_select(select=",".join(columns))
    resp = await execute_logged(b)
    return resp.data or []

explain

explain(*, redact: bool = True) -> ExplainResult

Resolved HTTP request — no execute.

Parameters:

Name Type Description Default
redact bool

When True (default), auth headers (apikey, Authorization, Cookie) are replaced with ***REDACTED***.

True
Source code in src/supabase_orm/_async/_query.py
def explain(self, *, redact: bool = True) -> ExplainResult:
    """Resolved HTTP request — no execute.

    Args:
        redact: When True (default), auth headers (``apikey``,
            ``Authorization``, ``Cookie``) are replaced with
            ``***REDACTED***``.
    """
    return _explain_from_builder(self._make_select(), redact=redact)

raw

raw() -> Any

Return the underlying postgrest async builder for ops we don't model.

Resolves the client at call time, so the builder is bound to the current ContextVar — pair with use_client() in a request scope and the escape hatch sees the same client as the rest of your handler. Pair with :func:supabase_orm.serialize if you need wire-value coercion for non-JSON-native python types.

Source code in src/supabase_orm/_async/_query.py
def raw(self) -> Any:
    """Return the underlying postgrest async builder for ops we don't model.

    Resolves the client at call time, so the builder is bound to the
    current ContextVar — pair with ``use_client()`` in a request
    scope and the escape hatch sees the same client as the rest of
    your handler. Pair with :func:`supabase_orm.serialize` if you
    need wire-value coercion for non-JSON-native python types.
    """
    return self._make_select()

all_with_count async

all_with_count() -> tuple[list[T], int]

Run the query and ask PostgREST for an exact total in one round-trip.

Useful for paginated endpoints — saves a separate .count() call. count="exact" is computed on the FILTERED row set, ignoring limit/offset, which is the standard pagination semantics.

Source code in src/supabase_orm/_async/_query.py
async def all_with_count(self) -> tuple[list[T], int]:
    """Run the query and ask PostgREST for an exact total in one round-trip.

    Useful for paginated endpoints — saves a separate ``.count()`` call.
    ``count="exact"`` is computed on the FILTERED row set, ignoring
    ``limit``/``offset``, which is the standard pagination semantics.
    """
    resp = await execute_logged(self._make_select(count="exact"))
    rows = self._validate_rows(resp.data or [])
    return rows, getattr(resp, "count", None) or 0

count async

count() -> int

Count matching rows via a head-only request (no body, no validation).

Source code in src/supabase_orm/_async/_query.py
async def count(self) -> int:
    """Count matching rows via a head-only request (no body, no validation)."""
    b = self._make_select(select="*", count="exact", head=True)
    resp = await execute_logged(b)
    return getattr(resp, "count", None) or 0

exists async

exists() -> bool

Return True iff at least one row matches the current filters.

Wire shape: select=<pk>&limit=1 — projects the PK only, fetches at most one row, skips Pydantic validation. Cheap on tables of any size, and unlike count() > 0 does not ask Postgres for an exact total.

Source code in src/supabase_orm/_async/_query.py
async def exists(self) -> bool:
    """Return ``True`` iff at least one row matches the current filters.

    Wire shape: ``select=<pk>&limit=1`` — projects the PK only, fetches
    at most one row, skips Pydantic validation. Cheap on tables of any
    size, and unlike ``count() > 0`` does not ask Postgres for an exact
    total.
    """
    b = self._make_select(select=self._model.__pk__).limit(1)
    resp = await execute_logged(b)
    return bool(resp.data)

iter

iter(*, batch_size: int = 1000) -> AsyncIterator[T]

Yield every matching row using PK keyset pagination.

Constant-time per batch (uses the PK index). Owns ordering and pagination — chaining .order_by() / .limit() / .offset() / .range() before .iter() raises.

Snapshot semantics are loose: rows with pk > cursor inserted mid-iteration are picked up; rows with pk < cursor are missed. With monotonic PKs (UUIDv7, serial) the latter can't happen. Race-safe for concurrent deletes.

Parameters:

Name Type Description Default
batch_size int

Rows per round-trip.

1000
Source code in src/supabase_orm/_async/_query.py
def iter(self, *, batch_size: int = 1000) -> AsyncIterator[T]:
    """Yield every matching row using PK keyset pagination.

    Constant-time per batch (uses the PK index). Owns ordering and
    pagination — chaining ``.order_by()`` / ``.limit()`` / ``.offset()``
    / ``.range()`` before ``.iter()`` raises.

    Snapshot semantics are loose: rows with ``pk > cursor`` inserted
    mid-iteration are picked up; rows with ``pk < cursor`` are missed.
    With monotonic PKs (UUIDv7, serial) the latter can't happen.
    Race-safe for concurrent deletes.

    Args:
        batch_size: Rows per round-trip.
    """
    model = self._model
    pk = model.__pk__
    if pk not in model.model_fields:
        raise SupabaseORMUsageError(
            f"{model.__name__}.iter() needs __pk__ {pk!r} to be a model field."
        )
    for op in self._ops:
        if op[0] in _ITER_FORBIDDEN_OPS:
            raise SupabaseORMUsageError(
                f"iter() owns ordering and pagination — drop the chained "
                f".{op[0].value}() call."
            )
    return self._iter_impl(pk, batch_size)

delete async

delete(
    *,
    allow_unfiltered: bool = ...,
    returning: Literal["representation"] = ...,
) -> list[T]
delete(
    *,
    allow_unfiltered: bool = ...,
    returning: Literal["minimal"],
) -> None
delete(
    *,
    allow_unfiltered: bool = False,
    returning: ReturnMode = "representation",
) -> list[T] | None

Bulk-delete every matching row.

Parameters:

Name Type Description Default
allow_unfiltered bool

Required to delete every row when no filter is chained; PostgREST also rejects unfiltered DELETE server-side.

False
returning ReturnMode

"minimal" skips the body and returns None.

'representation'

Returns:

Type Description
list[T] | None

The deleted rows, or None when returning="minimal".

Source code in src/supabase_orm/_async/_query.py
async def delete(
    self,
    *,
    allow_unfiltered: bool = False,
    returning: ReturnMode = "representation",
) -> list[T] | None:
    """Bulk-delete every matching row.

    Args:
        allow_unfiltered: Required to delete every row when no filter is
            chained; PostgREST also rejects unfiltered DELETE server-side.
        returning: ``"minimal"`` skips the body and returns ``None``.

    Returns:
        The deleted rows, or ``None`` when ``returning="minimal"``.
    """
    validate_returning(returning)
    if not allow_unfiltered and not self._has_filter():
        raise SupabaseORMUsageError(
            "Refusing unfiltered .delete() — chain at least one filter or "
            "pass allow_unfiltered=True to wipe the table."
        )
    b = get_client().table(self._model.__table__).delete(returning=returning)
    b = self._replay(b)
    b = self._apply_relation_filters_on(b)
    resp = await execute_logged(b)
    if returning == "minimal":
        return None
    return self._validate_rows(resp.data or [])

update async

update(
    *,
    allow_unfiltered: bool = ...,
    returning: Literal["representation"] = ...,
    **values: Any,
) -> list[T]
update(
    *,
    allow_unfiltered: bool = ...,
    returning: Literal["minimal"],
    **values: Any,
) -> None
update(
    *,
    allow_unfiltered: bool = False,
    returning: ReturnMode = "representation",
    **values: Any,
) -> list[T] | None

Bulk-update every matching row.

Parameters:

Name Type Description Default
allow_unfiltered bool

Required to update every row when no filter is chained.

False
returning ReturnMode

"minimal" skips the body and returns None.

'representation'
**values Any

Column=value pairs to set.

{}

Returns:

Type Description
list[T] | None

The updated rows, or None when returning="minimal".

Source code in src/supabase_orm/_async/_query.py
async def update(
    self,
    *,
    allow_unfiltered: bool = False,
    returning: ReturnMode = "representation",
    **values: Any,
) -> list[T] | None:
    """Bulk-update every matching row.

    Args:
        allow_unfiltered: Required to update every row when no filter is
            chained.
        returning: ``"minimal"`` skips the body and returns ``None``.
        **values: Column=value pairs to set.

    Returns:
        The updated rows, or ``None`` when ``returning="minimal"``.
    """
    validate_returning(returning)
    if not allow_unfiltered and not self._has_filter():
        raise SupabaseORMUsageError(
            "Refusing unfiltered .update() — chain at least one filter or "
            "pass allow_unfiltered=True to update every row."
        )
    if not values:
        raise SupabaseORMUsageError(
            "update() requires at least one key=value to set."
        )
    payload = {k: serialize(v) for k, v in values.items()}
    b = (
        get_client()
        .table(self._model.__table__)
        .update(payload, returning=returning)
    )
    b = self._replay(b)
    b = self._apply_relation_filters_on(b)
    resp = await execute_logged(b)
    if returning == "minimal":
        return None
    return self._validate_rows(resp.data or [])