ATProto access

The ATProto client: identity resolution, the PDS and appview record clients, content-addressed blob fetch, app-password session auth, and the repo firehose consumer. The identity, PDS, blob, and appview transports are built on httpx; the firehose runs over a websocket from the websockets library. Public reads need no auth.

Identity

Resolves handles to DIDs, DIDs to documents, and DIDs to PDS endpoints, caching results in memory.

lairs.atproto.identity

Identity resolution: handle to DID and DID to PDS endpoint.

Resolves a handle to a DID (via DNS TXT or the .well-known/atproto-did HTTP endpoint), a DID to its DID document (via the PLC directory for did:plc or the did:web document), and a DID to its PDS service endpoint. Resolutions are cached in memory so repeated lookups during a pull do not re-hit the network.

The transport is built on httpx rather than the atproto SDK. Public reads need no auth; an injected client can later carry a session for private reads. All results are returned as dx.Model instances.

DEFAULT_PLC_DIRECTORY module-attribute

DEFAULT_PLC_DIRECTORY = 'https://plc.directory'

The default PLC directory used to resolve did:plc identifiers.

IdentityResolution

Bases: Model

The resolved identity of an ATProto repository.

ATTRIBUTE DESCRIPTION
did

The resolved decentralised identifier.

TYPE: str

pds_endpoint

The base URL of the repository's personal data server.

TYPE: str

handle

The handle the resolution started from, when known.

TYPE: (str or None, optional)

IdentityError

Bases: RuntimeError

Raised when an identity cannot be resolved.

This wraps DNS, HTTP, and document-shape failures behind a single error so callers do not have to discriminate transport-specific exceptions.

IdentityResolver

IdentityResolver(
    client: Client | None = None,
    *,
    plc_directory: str = DEFAULT_PLC_DIRECTORY,
)

A caching resolver for handles, DIDs, and PDS endpoints.

PARAMETER DESCRIPTION
client

An injected HTTP client. When omitted, a private client is created and closed with the resolver. Injecting a client lets a session carry auth for private reads later.

TYPE: Client or None DEFAULT: None

plc_directory

The PLC directory base URL used for did:plc resolution.

TYPE: str DEFAULT: DEFAULT_PLC_DIRECTORY

close

close() -> None

Close the underlying HTTP client if the resolver owns it.

resolve_handle

resolve_handle(handle: str) -> str

Resolve a handle to a DID.

Resolution uses the .well-known/atproto-did HTTP endpoint, which is the dependency-free path and covers both DNS- and HTTP-method handles once the handle host serves it. A DNS _atproto TXT lookup would require a third-party resolver and is left to the optional firehose or an injected client; lairs does not add a DNS dependency to core. Results are cached.

PARAMETER DESCRIPTION
handle

The ATProto handle (for example alice.bsky.social).

TYPE: str

RETURNS DESCRIPTION
str

The resolved DID.

RAISES DESCRIPTION
IdentityError

If the handle cannot be resolved.

resolve_did

resolve_did(did: str) -> dict[str, JsonValue]

Resolve a DID to its DID document.

PARAMETER DESCRIPTION
did

The DID to resolve (did:plc or did:web).

TYPE: str

RETURNS DESCRIPTION
dict

The parsed DID document.

RAISES DESCRIPTION
IdentityError

If the DID method is unsupported or resolution fails.

resolve_pds

resolve_pds(did: str) -> str

Resolve a DID to its PDS service endpoint.

PARAMETER DESCRIPTION
did

The DID to resolve.

TYPE: str

RETURNS DESCRIPTION
str

The PDS endpoint URL.

RAISES DESCRIPTION
IdentityError

If the DID document carries no PDS service entry.

resolve

resolve(actor: str) -> IdentityResolution

Fully resolve a handle or DID to an identity.

PARAMETER DESCRIPTION
actor

A handle or a DID.

TYPE: str

RETURNS DESCRIPTION
IdentityResolution

The resolved identity (did, pds endpoint, and originating handle).

resolve_handle

resolve_handle(handle: str) -> str

Resolve a handle to a DID using a throwaway resolver.

PARAMETER DESCRIPTION
handle

The ATProto handle (for example alice.bsky.social).

TYPE: str

RETURNS DESCRIPTION
str

The resolved DID.

RAISES DESCRIPTION
IdentityError

If the handle cannot be resolved.

resolve_did

resolve_did(did: str) -> dict[str, JsonValue]

Resolve a DID to its DID document using a throwaway resolver.

PARAMETER DESCRIPTION
did

The DID to resolve.

TYPE: str

RETURNS DESCRIPTION
dict

The resolved DID document.

RAISES DESCRIPTION
IdentityError

If the DID cannot be resolved.

resolve_pds

resolve_pds(did: str) -> str

Resolve a DID to its PDS service endpoint using a throwaway resolver.

PARAMETER DESCRIPTION
did

The DID to resolve.

TYPE: str

RETURNS DESCRIPTION
str

The PDS endpoint URL.

RAISES DESCRIPTION
IdentityError

If the DID cannot be resolved.

PDS records

Wraps com.atproto.repo.getRecord and listRecords, with cursor pagination folded into a lazy iterator and per-record decode collection. get_repo_car and get_repo add the bulk path: they fetch a whole repository as a CAR archive over com.atproto.sync.getRepo and decode its Merkle search tree into record envelopes in one round trip (decode_repo_car exposes the decode step on its own). describe_repo returns a RepoDescription table of contents over com.atproto.repo.describeRepo without enumerating records, and list_repos enumerates hosted repository DIDs over com.atproto.sync.listRepos, the seed source for a backfill crawl.

lairs.atproto.pds

Direct PDS record client.

Wraps com.atproto.repo.getRecord and com.atproto.repo.listRecords over the XRPC HTTP interface of a PDS, with listRecords cursor pagination folded into a lazy iterator. Responses use the standard {uri, cid, value} envelope, modelled here as RecordEnvelope. A generic decode helper validates an envelope's value against any dx.Model target; decode_all decodes a batch and collects per-record validation failures instead of failing fast.

The transport is httpx. The bulk com.atproto.sync.getRepo path fetches a CAR archive and decodes its Merkle search tree into record envelopes through libipld. Public reads need no auth.

DEFAULT_PAGE_SIZE module-attribute

DEFAULT_PAGE_SIZE = 100

The default page size requested from listRecords.

QueryParams

QueryParams = dict[str, str | int | bool]

The scalar parameter mapping accepted by an XRPC query.

XRPC query parameters are always JSON scalars (strings, integers, or booleans), which is narrower than JsonValue and matches what httpx accepts for a query string.

RecordEnvelope

Bases: Model

The standard ATProto record envelope.

ATTRIBUTE DESCRIPTION
uri

The AT-URI of the record.

TYPE: str

cid

The content identifier of the record.

TYPE: str

value

The record's JSON value, decoded against a generated model on demand.

TYPE: JsonValue

RecordDecodeFailure

Bases: Model

A per-record decode failure with diagnostics.

ATTRIBUTE DESCRIPTION
uri

The AT-URI of the record that failed to decode.

TYPE: str

cid

The content identifier of the record that failed to decode.

TYPE: str

error

A human-readable description of the validation failure.

TYPE: str

RecordNotFoundError

Bases: LookupError

Raised when a record lookup returns no usable record.

A real ATProto record-not-found is a non-success status that surfaces as httpx.HTTPStatusError. This exception covers the narrower case of a 200 response whose body is not a usable {uri, cid, value} record object (an empty or malformed body), which would otherwise be silently coerced into an envelope with an empty uri that a caller could not tell apart from a real record.

RepoDescription

Bases: Model

A repository table of contents from com.atproto.repo.describeRepo.

ATTRIBUTE DESCRIPTION
did

The repository DID.

TYPE: str

handle

The repository's handle as resolved by the PDS.

TYPE: str

handle_is_correct

Whether the PDS verified the handle resolves back to this DID.

TYPE: bool

collections

The collection NSIDs present in the repository.

TYPE: tuple of str

did_doc

The repository's DID document, as returned by the PDS.

TYPE: JsonValue

PdsClient

PdsClient(endpoint: str, client: Client | None = None)

An XRPC client over a single PDS, for read-only record access.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS (for example https://pds.example).

TYPE: str

client

An injected HTTP client. When omitted, a private client is created and closed with this client. Injecting a client lets a session carry auth for private reads later; public reads need no auth.

TYPE: Client or None DEFAULT: None

close

close() -> None

Close the underlying HTTP client if this client owns it.

get_record

get_record(
    repo: str, collection: str, rkey: str
) -> RecordEnvelope

Fetch a single record by repo, collection, and rkey.

PARAMETER DESCRIPTION
repo

The repository DID or handle.

TYPE: str

collection

The record collection NSID.

TYPE: str

rkey

The record key.

TYPE: str

RETURNS DESCRIPTION
RecordEnvelope

The {uri, cid, value} record envelope.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

RecordNotFoundError

If the PDS returns a 200 whose body is not a usable record object (an empty or malformed body with no string uri).

list_records

list_records(
    repo: str,
    collection: str,
    *,
    limit: int | None = None,
    cursor: str | None = None,
) -> Iterator[RecordEnvelope]

Enumerate records in a collection with cursor pagination.

Pages are fetched lazily: each page is requested only when the consumer advances past the previous page, and iteration stops when the PDS stops returning a cursor.

PARAMETER DESCRIPTION
repo

The repository DID or handle.

TYPE: str

collection

The record collection NSID.

TYPE: str

limit

The page size requested from the PDS; defaults to the module page size.

TYPE: int or None DEFAULT: None

cursor

An opaque pagination cursor to resume from.

TYPE: str or None DEFAULT: None

YIELDS DESCRIPTION
RecordEnvelope

Record envelopes, in PDS order, across all pages.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status for any page.

get_repo_car

get_repo_car(repo: str) -> bytes

Fetch a whole repository as a raw CAR archive.

This is the bulk com.atproto.sync.getRepo path. The archive is read fully into memory; use get_repo to decode it into envelopes.

PARAMETER DESCRIPTION
repo

The repository DID.

TYPE: str

RETURNS DESCRIPTION
bytes

The CAR archive bytes.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

get_repo

get_repo(repo: str) -> tuple[RecordEnvelope, ...]

Fetch a whole repository and decode it into record envelopes.

Fetches the repository CAR in a single request and walks its Merkle search tree, yielding one envelope per record. This recovers the same records as listing every collection, in one round trip.

PARAMETER DESCRIPTION
repo

The repository DID.

TYPE: str

RETURNS DESCRIPTION
tuple of RecordEnvelope

One envelope per record in the repository, in MST key order.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

describe_repo

describe_repo(repo: str) -> RepoDescription

Fetch a repository's table of contents.

Wraps com.atproto.repo.describeRepo, which returns the repository's collection NSIDs, handle, and DID document without enumerating any records. This is the cheap way to learn which collections a repo holds.

PARAMETER DESCRIPTION
repo

The repository DID or handle.

TYPE: str

RETURNS DESCRIPTION
RepoDescription

The repository's collections, handle, and DID document.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

list_repos

list_repos(*, cursor: str | None = None) -> Iterator[str]

Enumerate the DIDs of repositories this service hosts.

Wraps com.atproto.sync.listRepos with cursor pagination folded into a lazy iterator, the seed source for a backfill crawl over a relay or PDS.

PARAMETER DESCRIPTION
cursor

An opaque pagination cursor to resume from.

TYPE: str or None DEFAULT: None

YIELDS DESCRIPTION
str

Repository DIDs, across all pages.

RAISES DESCRIPTION
HTTPStatusError

If the service returns a non-success status for any page.

decode

decode(envelope: RecordEnvelope, model: type[T]) -> T

Decode a single envelope's value into a model instance.

PARAMETER DESCRIPTION
envelope

The record envelope to decode.

TYPE: RecordEnvelope

model

The target dx.Model subclass.

TYPE: type

RETURNS DESCRIPTION
T

The validated model instance.

RAISES DESCRIPTION
ValidationError

If the envelope's value does not validate against model.

decode_all

decode_all(
    envelopes: Sequence[RecordEnvelope], model: type[T]
) -> tuple[tuple[T, ...], tuple[RecordDecodeFailure, ...]]

Decode a batch of envelopes, collecting failures.

Validation failures are gathered into a tuple of RecordDecodeFailure models with per-record diagnostics so a single bad record does not abort the batch. The result is a (records, failures) pair; a generic result model is not used because didactic does not classify a model field typed by an unbound type variable.

PARAMETER DESCRIPTION
envelopes

The record envelopes to decode.

TYPE: collections.abc.Sequence of RecordEnvelope

model

The target dx.Model subclass.

TYPE: type

RETURNS DESCRIPTION
tuple

A (records, failures) pair: the successfully decoded model instances and the per-record decode failures.

decode_repo_car

decode_repo_car(car: bytes) -> tuple[RecordEnvelope, ...]

Decode a CAR archive into record envelopes.

Parses the CAR block store with libipld, then walks the repository's Merkle search tree to recover every record as a {uri, cid, value} envelope. Record values are rendered in DAG-JSON shape so they decode against the generated models exactly as the XRPC record endpoints do.

PARAMETER DESCRIPTION
car

The CAR archive bytes from com.atproto.sync.getRepo.

TYPE: bytes

RETURNS DESCRIPTION
tuple of RecordEnvelope

One envelope per record in the repository, in MST key order.

get_record

get_record(
    endpoint: str, repo: str, collection: str, rkey: str
) -> RecordEnvelope

Fetch a single record using a throwaway client.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS.

TYPE: str

repo

The repository DID or handle.

TYPE: str

collection

The record collection NSID.

TYPE: str

rkey

The record key.

TYPE: str

RETURNS DESCRIPTION
RecordEnvelope

The record envelope.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

RecordNotFoundError

If the PDS returns a 200 whose body is not a usable record object.

list_records

list_records(
    endpoint: str,
    repo: str,
    collection: str,
    *,
    limit: int | None = None,
    cursor: str | None = None,
) -> list[RecordEnvelope]

List records using a throwaway client, draining all pages.

The lazy iterator is fully consumed here so the throwaway client can close; use PdsClient.list_records for true streaming over an open client.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS.

TYPE: str

repo

The repository DID or handle.

TYPE: str

collection

The record collection NSID.

TYPE: str

limit

The page size requested from the PDS.

TYPE: int or None DEFAULT: None

cursor

An opaque pagination cursor to resume from.

TYPE: str or None DEFAULT: None

RETURNS DESCRIPTION
list of RecordEnvelope

Every record envelope across all pages.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status for any page.

get_repo

get_repo(
    endpoint: str, repo: str
) -> tuple[RecordEnvelope, ...]

Fetch and decode a whole repository using a throwaway client.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS.

TYPE: str

repo

The repository DID.

TYPE: str

RETURNS DESCRIPTION
tuple of RecordEnvelope

One envelope per record in the repository, in MST key order.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

describe_repo

describe_repo(endpoint: str, repo: str) -> RepoDescription

Fetch a repository's table of contents using a throwaway client.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS.

TYPE: str

repo

The repository DID or handle.

TYPE: str

RETURNS DESCRIPTION
RepoDescription

The repository's collections, handle, and DID document.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

Blobs

Content-addressed blob fetch over com.atproto.sync.getBlob. get_blob returns the blob in full, while iter_blob yields it in chunks without buffering the whole blob. Blob upload is owned by the authoring component and raises here.

lairs.atproto.blobs

Blob fetch over ATProto.

Wraps com.atproto.sync.getBlob for streamed, content-addressed media bytes. The returned BlobBytes holder carries the CID and the raw bytes (in an opaque field) for the media layer to cache; this module does not implement the cache, which is owned by the store and media components.

Blob upload (com.atproto.repo.uploadBlob) is a write and belongs to the authoring component; it is a clearly-marked deferred stub here. The transport is httpx; public reads need no auth.

BlobBytes

Bases: Model

A content-addressed holder of fetched blob bytes.

The bytes are carried in an opaque field so the holder stays a dx.Model while remaining a runtime container the media layer can cache by CID.

ATTRIBUTE DESCRIPTION
did

The repository DID the blob was fetched from.

TYPE: str

cid

The content identifier of the blob.

TYPE: str

data

The raw blob bytes.

TYPE: bytes

mime_type

The MIME type reported by the PDS, when known.

TYPE: (str or None, optional)

BlobClient

BlobClient(endpoint: str, client: Client | None = None)

An XRPC client for streamed, content-addressed blob fetch.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS (for example https://pds.example).

TYPE: str

client

An injected HTTP client. When omitted, a private client is created and closed with this client. Public reads need no auth.

TYPE: Client or None DEFAULT: None

close

close() -> None

Close the underlying HTTP client if this client owns it.

get_blob

get_blob(did: str, cid: str) -> BlobBytes

Fetch blob bytes for a DID and CID, streamed.

The response is streamed in chunks and concatenated so a large media blob is not materialised twice; the media layer is responsible for caching by CID.

PARAMETER DESCRIPTION
did

The repository DID that holds the blob.

TYPE: str

cid

The content identifier of the blob.

TYPE: str

RETURNS DESCRIPTION
BlobBytes

The content-addressed blob bytes holder.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

iter_blob

iter_blob(did: str, cid: str) -> Iterator[bytes]

Yield blob bytes in chunks without buffering the whole blob.

PARAMETER DESCRIPTION
did

The repository DID that holds the blob.

TYPE: str

cid

The content identifier of the blob.

TYPE: str

YIELDS DESCRIPTION
bytes

Successive byte chunks of the blob.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

get_blob

get_blob(endpoint: str, did: str, cid: str) -> BlobBytes

Fetch blob bytes using a throwaway client.

PARAMETER DESCRIPTION
endpoint

The base URL of the PDS.

TYPE: str

did

The repository DID that holds the blob.

TYPE: str

cid

The content identifier of the blob.

TYPE: str

RETURNS DESCRIPTION
BlobBytes

The content-addressed blob bytes holder.

RAISES DESCRIPTION
HTTPStatusError

If the PDS returns a non-success status.

upload_blob

upload_blob(data: bytes, mime_type: str) -> BlobRef

Upload blob bytes and return a blob reference.

Blob upload is a write to the authenticated user's own repository and is owned by the authoring component, which carries the OAuth session and write scopes. The access layer is read-only, so this is a deferred stub.

PARAMETER DESCRIPTION
data

The blob bytes to upload.

TYPE: bytes

mime_type

The MIME type of the blob.

TYPE: str

RETURNS DESCRIPTION
BlobRef

A reference to the uploaded blob.

RAISES DESCRIPTION
NotImplementedError

Always; blob upload is owned by the authoring component.

Appview

An optional thin client over the Layers appview query API, used for discovery without walking PDSes. query issues a raw XRPC query and returns the decoded response body; get runs a get* method and returns a single record envelope; list runs a list* method and lazily iterates record envelopes across pages, with cursor pagination folded into the iterator. Responses decode through the same envelope as the PDS.

lairs.atproto.appview

Optional appview XRPC query client.

A thin client over the Layers appview query API (pub.layers.*.get* and list*) used for discovery and cross-ref resolution without walking PDSes. The appview is an accelerator only: lairs works with it off, where direct PDS access is the contract. Responses use the same {uri, cid, value} envelope as the PDS, so they decode through the same generated models.

The transport is httpx; the endpoint is configurable. Public queries need no auth.

AppviewClient

AppviewClient(endpoint: str, client: Client | None = None)

A thin XRPC client over the Layers appview query API.

PARAMETER DESCRIPTION
endpoint

The base URL of the appview XRPC service.

TYPE: str

client

An injected HTTP client. When omitted, a private client is created and closed with this client. Public queries need no auth.

TYPE: Client or None DEFAULT: None

close

close() -> None

Close the underlying HTTP client if this client owns it.

query

query(
    nsid: str, params: QueryParams
) -> dict[str, JsonValue]

Issue an XRPC query against the appview.

PARAMETER DESCRIPTION
nsid

The query method NSID (for example corpus.listCorpora).

TYPE: str

params

The query parameters.

TYPE: dict

RETURNS DESCRIPTION
dict

The decoded XRPC response body.

RAISES DESCRIPTION
HTTPStatusError

If the appview returns a non-success status.

get

get(nsid: str, params: QueryParams) -> RecordEnvelope

Issue a get* query and return its record envelope.

PARAMETER DESCRIPTION
nsid

The get* method NSID (for example corpus.getCorpus).

TYPE: str

params

The query parameters.

TYPE: dict

RETURNS DESCRIPTION
RecordEnvelope

The {uri, cid, value} record envelope.

RAISES DESCRIPTION
HTTPStatusError

If the appview returns a non-success status.

RecordNotFoundError

If the appview returns a 200 whose body is not a usable record object (an empty or malformed body with no string uri).

list

list(
    nsid: str,
    params: QueryParams,
    *,
    results_key: str = "records",
    cursor: str | None = None,
) -> Iterator[RecordEnvelope]

Issue a list* query and lazily iterate its record envelopes.

Cursor pagination is folded into the iterator: each page is fetched only when the consumer advances past the previous one, and iteration stops when the appview stops returning a cursor.

PARAMETER DESCRIPTION
nsid

The list* method NSID (for example corpus.listCorpora).

TYPE: str

params

The query parameters, excluding the cursor.

TYPE: dict

results_key

The response key holding the records array.

TYPE: str DEFAULT: 'records'

cursor

An opaque pagination cursor to resume from.

TYPE: str or None DEFAULT: None

YIELDS DESCRIPTION
RecordEnvelope

Record envelopes, in appview order, across all pages.

RAISES DESCRIPTION
HTTPStatusError

If the appview returns a non-success status for any page.

Auth

App-password session auth for writes and private reads. login resolves an actor to its PDS and calls com.atproto.server.createSession; SessionAuth is an httpx.Auth that attaches the access token and, on a 401, refreshes via com.atproto.server.refreshSession (falling back to a fresh login when the refresh token has expired); SessionStore persists the credential-bearing Session to the XDG state directory with 0600 permissions; and authed_client builds an HTTP client wired to a self-renewing session.

lairs.atproto.auth

App-password authentication and session management for PDS accounts.

lairs authenticates writes (and private reads) with an ATProto app-password session: login resolves an actor to its PDS and calls com.atproto.server.createSession; SessionAuth is an httpx.Auth that attaches the access token and, on a 401, refreshes via com.atproto.server.refreshSession (falling back to a fresh login with the stored app password when the refresh token has also expired); and SessionStore persists the session to the XDG state directory so a single login carries across commands. This mirrors the ergonomics of goat.

The persisted session file contains credentials (the access and refresh tokens and the app password used for seamless re-auth); it is written with 0600 permissions.

Session

Bases: Model

An authenticated PDS session.

ATTRIBUTE DESCRIPTION
did

The authenticated account DID.

TYPE: str

pds_endpoint

The base URL of the account's PDS.

TYPE: str

access_jwt

The short-lived access token.

TYPE: str

refresh_jwt

The long-lived refresh token.

TYPE: str

handle

The account handle, when known.

TYPE: str or None

password

The app password, retained to re-authenticate when the refresh token expires. None for an in-memory session that should not re-login.

TYPE: str or None

SessionRenewalError

Bases: HTTPError

Raised when a 401 cannot be recovered by renewing the session.

This is raised by SessionAuth when the access token is rejected and neither refreshSession nor a stored-password createSession login can mint a fresh token. It subclasses httpx.HTTPError so a caller that already catches transport errors catches it too. Raising rather than re-sending the original request means a non-idempotent write whose renewal fails is attempted against the target endpoint exactly once.

SessionAuth

SessionAuth(
    session: Session,
    *,
    on_update: Callable[[Session], None] | None = None,
)

Bases: Auth

An httpx.Auth that attaches and self-renews a PDS access token.

Each request carries the session's access token. On a 401 the access token is refreshed through refreshSession; if that also fails and the session retains an app password, a fresh createSession login is attempted. When the tokens rotate, an optional callback is invoked so a store can persist them.

PARAMETER DESCRIPTION
session

The session whose tokens to attach and renew.

TYPE: Session

on_update

A callback invoked with the new session whenever the tokens rotate.

TYPE: Callable or None DEFAULT: None

auth_flow

auth_flow(request: Request) -> Generator[Request, Response]

Attach the access token, renewing it once on a 401.

The original request is retried only when renewal actually rotated the access token, so the response the caller receives is the target endpoint's and not an intermediate refresh response. When renewal does not yield a new token (a dead refresh token and no stored password) the request is not re-sent with the unchanged stale token; instead a SessionRenewalError is raised. This means a non-idempotent write whose renewal fails is attempted against the target endpoint exactly once rather than twice.

RAISES DESCRIPTION
SessionRenewalError

If the access token is rejected and renewal mints no new token.

SessionStore

SessionStore(path: Path | None = None)

A file-backed store for the authenticated session.

The session is written to the XDG state directory (or the path given by the LAIRS_AUTH_FILE environment variable) with 0600 permissions, since it holds credentials.

PARAMETER DESCRIPTION
path

An explicit session file path; the default location is used when omitted.

TYPE: Path or None DEFAULT: None

path property

path: Path

Return the session file path.

RETURNS DESCRIPTION
Path

The session file path.

save

save(session: Session) -> None

Persist a session, creating the file with restricted permissions.

PARAMETER DESCRIPTION
session

The session to persist.

TYPE: Session

load

load() -> Session | None

Load the stored session, or None when none is stored.

RETURNS DESCRIPTION
Session or None

The stored session, or None.

delete

delete() -> bool

Delete the stored session.

RETURNS DESCRIPTION
bool

True if a session file was present and removed.

login

login(
    identifier: str,
    app_password: str,
    *,
    pds: str | None = None,
    resolver: IdentityResolver | None = None,
    client: Client | None = None,
) -> Session

Authenticate with an app password and return a session.

Resolves identifier (a handle or DID) to its PDS unless pds is given, then calls com.atproto.server.createSession. The returned session retains the app password so a stored session can re-authenticate after its refresh token expires.

PARAMETER DESCRIPTION
identifier

The account handle or DID.

TYPE: str

app_password

An app password (not the account password).

TYPE: str

pds

The PDS base URL; skips identity resolution when given.

TYPE: str or None DEFAULT: None

resolver

An injected identity resolver for the resolution step.

TYPE: IdentityResolver or None DEFAULT: None

client

An injected HTTP client; a private one is created when omitted.

TYPE: Client or None DEFAULT: None

RETURNS DESCRIPTION
Session

The authenticated session.

RAISES DESCRIPTION
HTTPStatusError

If the PDS rejects the credentials.

authed_client

authed_client(
    session: Session, *, store: SessionStore | None = None
) -> Client

Build an HTTP client that authenticates and self-renews a session.

PARAMETER DESCRIPTION
session

The session to authenticate with.

TYPE: Session

store

A store to persist rotated tokens to; refreshes are kept in memory only when omitted.

TYPE: SessionStore or None DEFAULT: None

RETURNS DESCRIPTION
Client

A client whose requests carry and renew the session's tokens.

Firehose

The repo firehose consumer. subscribe_repos opens a websocket to com.atproto.sync.subscribeRepos, decodes each frame's embedded CAR archive through the shared CAR primitives, and yields one FirehoseEvent per commit op whose collection matches the Layers NSIDs. The stream is live and unbounded: the consumer controls how many events to take, and closing the generator closes the websocket. The RepoSubscriber protocol describes the same streaming surface for an injectable consumer.

lairs.atproto.firehose

Firehose consumer.

Consumes com.atproto.sync.subscribeRepos over a websocket and yields the commit events whose collection matches the Layers NSIDs, to keep a local store fresh. Each frame is a pair of DAG-CBOR values (a header and a body); commit bodies carry an embedded CAR archive whose blocks hold the created and updated record values, which are recovered through the shared CAR primitives and rendered to the same DAG-JSON shape the XRPC record endpoints emit.

By default subscribe_repos is a single-connection primitive: a closed websocket ends iteration cleanly and the caller resumes from the highest event seq it saw. Passing reconnect=True makes it re-dial on a dropped connection, resuming from the highest delivered seq so the freshness loop survives a transient blip.

The websocket transport is provided by the websockets library, a core runtime dependency. Integration-marked tests exercise the consumer against a live PDS firehose.

LAYERS_NSID_PREFIX module-attribute

LAYERS_NSID_PREFIX = 'pub.layers.'

The NSID prefix used to filter the firehose to Layers collections.

FirehoseEvent

Bases: Model

A single decoded commit event from the repo firehose.

ATTRIBUTE DESCRIPTION
seq

The monotonically increasing sequence number of the event.

TYPE: int

repo

The DID of the repository the event came from.

TYPE: str

collection

The collection NSID of the affected record.

TYPE: str

rkey

The record key of the affected record.

TYPE: str

action

The commit action (for example create, update, delete).

TYPE: str

record

The record value for create and update actions, otherwise None.

TYPE: JsonValue

RepoSubscriber

Bases: Protocol

A consumer of the filtered repo firehose.

Implementations stream subscribeRepos commit events, decode their CAR blocks, and yield the subset whose collection matches the Layers NSIDs.

subscribe

subscribe(
    *,
    nsids: Sequence[str] | None = None,
    cursor: int | None = None,
) -> Iterator[FirehoseEvent]

Stream filtered firehose events.

PARAMETER DESCRIPTION
nsids

The collection NSIDs to keep; defaults to the Layers record set.

TYPE: collections.abc.Sequence of str or None DEFAULT: None

cursor

A sequence number to resume from.

TYPE: int or None DEFAULT: None

YIELDS DESCRIPTION
FirehoseEvent

Decoded commit events for the kept collections.

subscribe_repos

subscribe_repos(
    relay: str,
    *,
    nsids: Sequence[str] | None = None,
    cursor: int | None = None,
    reconnect: bool = False,
) -> Iterator[FirehoseEvent]

Subscribe to the repo firehose, filtered to the Layers NSIDs.

Opens a websocket to com.atproto.sync.subscribeRepos and yields one event per kept commit op as frames arrive. The stream is live and unbounded; the consumer controls how many events to take and closing the generator closes the websocket.

By default this is a single-connection primitive: when the websocket closes (a deliberate close or a transient network drop) iteration ends cleanly and the caller owns reconnect and cursor bookkeeping. Each FirehoseEvent carries its seq, so a caller tracking the highest seen seq can resume by passing it as cursor on a fresh call. Pass reconnect=True to have this function re-dial automatically on a dropped connection, resuming from the highest seq it has already delivered so the freshness loop survives a blip.

PARAMETER DESCRIPTION
relay

The relay or PDS firehose endpoint, with or without the XRPC path (for example wss://bsky.network or ws://localhost:3000).

TYPE: str

nsids

The collection NSIDs to keep; defaults to every collection under pub.layers..

TYPE: collections.abc.Sequence of str or None DEFAULT: None

cursor

A sequence number to resume from.

TYPE: int or None DEFAULT: None

reconnect

When True, re-dial on a dropped connection, resuming from the highest seq already delivered (or the original cursor when no event has been delivered yet). When False (the default), a closed connection ends iteration and the caller owns resumption.

TYPE: bool DEFAULT: False

YIELDS DESCRIPTION
FirehoseEvent

Decoded commit events for the kept collections.