Skip to content

State Management

Three-tier state persistence for strategy data.

StateManager

almanak.framework.state.StateManager

StateManager(
    config: StateManagerConfig | None = None,
    warm_backend: WarmStore | None = None,
)

Tiered state manager with HOT and WARM storage tiers.

Provides: - <1ms access from HOT (in-memory) cache - <10ms access from WARM (PostgreSQL or SQLite) storage - CAS semantics for safe concurrent updates - Automatic tier fallback on load - Metrics tracking for each tier - Write-through from HOT to WARM tier

The WARM tier backend can be either PostgreSQL (production) or SQLite (development/lightweight). Backend selection is via configuration:

Usage

PostgreSQL backend (default, production)

config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), ) manager = StateManager(config) await manager.initialize()

SQLite backend (development)

config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), ) manager = StateManager(config) await manager.initialize()

Save state (writes to HOT then WARM)

state = StateData(deployment_id="strat-1", version=1, state={"key": "value"}) await manager.save_state(state)

Load state (reads from fastest available tier)

loaded = await manager.load_state("strat-1")

CAS update

loaded.state["key"] = "new_value" await manager.save_state(loaded, expected_version=loaded.version)

Dependency injection: provide custom backend

custom_sqlite = SQLiteStore(SQLiteConfig(db_path="./custom.db")) manager = StateManager(config, warm_backend=custom_sqlite)

Initialize StateManager.

Parameters:

Name Type Description Default
config StateManagerConfig | None

Configuration for the state manager. Uses defaults if not provided.

None
warm_backend WarmStore | None

Optional pre-configured WARM tier backend. If provided, this backend is used instead of creating one from config. Useful for dependency injection and testing.

None

is_initialized property

is_initialized: bool

Check if StateManager is initialized.

enabled_tiers property

enabled_tiers: list[StateTier]

Get list of enabled and initialized tiers.

warm_backend_type property

warm_backend_type: WarmBackendType | None

Get the type of WARM backend being used.

Returns:

Type Description
WarmBackendType | None

WarmBackendType.SQLITE, WarmBackendType.POSTGRESQL, or None if no WARM tier.

warm_backend property

warm_backend: WarmStore | None

Get the WARM tier backend instance.

Useful for accessing backend-specific functionality like get_version_history() on SQLiteStore.

Returns:

Type Description
WarmStore | None

The WARM backend instance, or None if not initialized.

initialize async

initialize() -> None

Initialize all enabled storage tiers.

If load_state_on_startup is enabled in config, loads all active states from WARM tier to HOT tier for fast access.

close async

close() -> None

Close all storage connections.

load_state async

load_state(deployment_id: str) -> StateData

Load state from the fastest available tier.

Tries tiers in order: HOT -> WARM. Populates HOT cache on WARM hit.

Parameters:

Name Type Description Default
deployment_id str

Deployment identifier

required

Returns:

Type Description
StateData

StateData from the fastest available tier

Raises:

Type Description
StateNotFoundError

If state not found in any tier

save_state async

save_state(
    state: StateData, expected_version: int | None = None
) -> StateData

Save state to all tiers.

Writes to WARM tier (source of truth) then updates HOT cache.

Parameters:

Name Type Description Default
state StateData

State data to save

required
expected_version int | None

Expected version for CAS update. If None and state has version > 1, uses state.version - 1. If None and state has version = 1, creates new state.

None

Returns:

Type Description
StateData

Updated StateData with new version

Raises:

Type Description
StateConflictError

If CAS update fails due to version mismatch

delete_state async

delete_state(deployment_id: str) -> bool

Delete state from all tiers.

Parameters:

Name Type Description Default
deployment_id str

Deployment identifier

required

Returns:

Type Description
bool

True if state was deleted from at least one tier

invalidate_hot_cache

invalidate_hot_cache(
    deployment_id: str | None = None,
) -> None

Invalidate HOT tier cache.

Parameters:

Name Type Description Default
deployment_id str | None

Specific strategy to invalidate, or None to clear all

None

get_metrics

get_metrics(limit: int = 100) -> list[TierMetrics]

Get recent tier metrics.

Parameters:

Name Type Description Default
limit int

Maximum number of metrics to return

100

Returns:

Type Description
list[TierMetrics]

List of TierMetrics, newest first

get_metrics_summary

get_metrics_summary() -> dict[str, Any]

Get summary of tier metrics.

Returns:

Type Description
dict[str, Any]

Dictionary with per-tier average latencies and success rates

clear_metrics

clear_metrics() -> None

Clear all stored metrics.

save_clob_order async

save_clob_order(order: ClobOrderState) -> bool

Save or update a CLOB order state.

Persists order state to the WARM tier for crash recovery and order tracking across strategy restarts.

Parameters:

Name Type Description Default
order ClobOrderState

ClobOrderState to persist.

required

Returns:

Type Description
bool

True if save succeeded, False if no WARM backend or error.

get_clob_order async

get_clob_order(
    order_id: str, *, deployment_id: str
) -> ClobOrderState | None

Get a CLOB order by order_id.

Parameters:

Name Type Description Default
order_id str

Order identifier.

required

Returns:

Type Description
ClobOrderState | None

ClobOrderState if found, None otherwise.

get_open_clob_orders async

get_open_clob_orders(
    market_id: str | None = None, *, deployment_id: str
) -> list[ClobOrderState]

Get all open CLOB orders, optionally filtered by market.

Open orders are those with status: pending, submitted, live, partially_filled.

Parameters:

Name Type Description Default
market_id str | None

Optional market ID to filter by.

None

Returns:

Type Description
list[ClobOrderState]

List of open ClobOrderState, newest first.

update_clob_order_status async

update_clob_order_status(
    order_id: str,
    status: ClobOrderStatus,
    fills: list[ClobFill] | None = None,
    filled_size: str | None = None,
    average_fill_price: str | None = None,
    error: str | None = None,
    *,
    deployment_id: str,
) -> bool

Update the status and fill information of a CLOB order.

Parameters:

Name Type Description Default
order_id str

Order identifier.

required
status ClobOrderStatus

New order status.

required
fills list[ClobFill] | None

Updated list of fills (replaces existing).

None
filled_size str | None

Updated filled size.

None
average_fill_price str | None

Updated average fill price.

None
error str | None

Error message if order failed.

None

Returns:

Type Description
bool

True if order was found and updated.

save_portfolio_snapshot async

save_portfolio_snapshot(snapshot: PortfolioSnapshot) -> int

Save a portfolio snapshot.

Persists portfolio value and position data for dashboard display and PnL tracking.

Parameters:

Name Type Description Default
snapshot PortfolioSnapshot

PortfolioSnapshot to persist.

required

Returns:

Type Description
int

Snapshot ID on success. Raises :class:AccountingPersistenceError

int

on backend write failure, missing WARM backend, or unsupported

int

backend so the runner can halt the cycle in live mode (VIB-3157).

int

Paper/dry-run suppression is handled upstream by the runner.

get_latest_snapshot async

get_latest_snapshot(
    deployment_id: str,
) -> PortfolioSnapshot | None

Get most recent portfolio snapshot for a strategy.

Parameters:

Name Type Description Default
deployment_id str

Deployment identifier.

required

Returns:

Type Description
PortfolioSnapshot | None

Latest PortfolioSnapshot if found, None otherwise.

get_first_snapshot async

get_first_snapshot(
    deployment_id: str,
) -> PortfolioSnapshot | None

Get earliest portfolio snapshot for a strategy.

get_snapshots_since async

get_snapshots_since(
    deployment_id: str, since: datetime, limit: int = 168
) -> list[PortfolioSnapshot]

Get portfolio snapshots since a timestamp (for charts).

Parameters:

Name Type Description Default
deployment_id str

Deployment identifier.

required
since datetime

Start timestamp for query.

required
limit int

Maximum number of snapshots to return.

168

Returns:

Type Description
list[PortfolioSnapshot]

List of PortfolioSnapshot, oldest first.

get_recent_snapshots async

get_recent_snapshots(
    deployment_id: str, limit: int = 168
) -> list[PortfolioSnapshot]

Get the limit most-recent portfolio snapshots, oldest-first.

Unlike :meth:get_snapshots_since (oldest-first from a since anchor, for cursor-paginated charts), this returns the latest window so a consumer reading [-1] always gets the true latest snapshot. See VIB-5026: the PnL/quant-header loader paired ASC-from-since with snapshots[-1] and silently surfaced the 168th-oldest row once a deployment exceeded limit snapshots.

Returns:

Type Description
list[PortfolioSnapshot]

List of PortfolioSnapshot, oldest first. Empty list when no WARM

list[PortfolioSnapshot]

backend is configured or the backend lacks the method.

get_snapshots_in_window async

get_snapshots_in_window(
    deployment_id: str,
    from_ts: datetime | None,
    to_ts: datetime | None,
    *,
    scan_cap: int = 200000,
) -> tuple[
    list[
        tuple[datetime, str | None, str | None, str | None]
    ],
    bool,
]

Projected NAV samples inside a time window, for windowed charts (VIB-5059 P2).

Returns (rows, truncated) — see the backend method for the row shape.

Loud, not graceful. Unlike :meth:get_snapshots_since / :meth:get_recent_snapshots (which swallow backend errors and return [] so the default dashboard render degrades quietly), the windowed path is explicitly requested by the operator going back in time: a backend failure or a missing backend is raised, not masked as an empty-but-OK series the operator would read as "no history". The gateway maps the raised error to a non-OK gRPC status. The metric is still recorded on the failure path.

get_nav_series async

get_nav_series(
    deployment_id: str,
    *,
    since: tuple[datetime, int] | None = None,
    scan_cap: int = 200000,
) -> tuple[
    list[
        tuple[
            datetime,
            str | None,
            str | None,
            int,
            str | None,
        ]
    ],
    bool,
]

NAV-component series for lifetime drawdown / high-watermark (VIB-5118/5134).

Returns (rows, truncated) — see the backend method for the row shape and the two fetch modes. since=(last_ts, last_id) requests only the snapshots newer than a cursor (the incremental-fold path); since=None is the full history scan.

Graceful, not loud. Like :meth:get_recent_snapshots (and unlike the operator-requested time-travel :meth:get_snapshots_in_window), a backend failure or missing backend returns ([], False) rather than raising: lifetime drawdown is a default header metric, and the caller degrades to the recent-window drawdown when the full series is unavailable — better than no drawdown at all. The failure metric is still recorded so the degrade is observable.

get_snapshot_at async

get_snapshot_at(
    deployment_id: str, timestamp: datetime
) -> PortfolioSnapshot | None

Get snapshot closest to a timestamp (for PnL calculation).

Parameters:

Name Type Description Default
deployment_id str

Deployment identifier.

required
timestamp datetime

Target timestamp.

required

Returns:

Type Description
PortfolioSnapshot | None

PortfolioSnapshot closest to timestamp, or None if not found.

save_portfolio_metrics async

save_portfolio_metrics(metrics: PortfolioMetrics) -> bool

Save or update portfolio metrics.

Portfolio metrics store baseline values (initial_value_usd) that survive strategy restarts, enabling accurate PnL calculation.

Parameters:

Name Type Description Default
metrics PortfolioMetrics

PortfolioMetrics to persist.

required

Returns:

Type Description
bool

True on success. Raises :class:AccountingPersistenceError

bool

on backend write failure, missing WARM backend, or unsupported

bool

backend so the runner can halt the cycle in live mode (VIB-3157).

bool

Paper/dry-run suppression is handled upstream by the runner.

get_portfolio_metrics async

get_portfolio_metrics(
    deployment_id: str,
) -> PortfolioMetrics | None

Get portfolio metrics for a strategy.

Parameters:

Name Type Description Default
deployment_id str

Deployment identifier.

required

Returns:

Type Description
PortfolioMetrics | None

PortfolioMetrics if found, None otherwise.

cleanup_old_snapshots async

cleanup_old_snapshots(retention_days: int = 7) -> int

Clean up old portfolio snapshots.

Parameters:

Name Type Description Default
retention_days int

Number of days of snapshots to retain.

7

Returns:

Type Description
int

Number of snapshots deleted.

save_ledger_entry async

save_ledger_entry(entry: LedgerEntry) -> None

Save a transaction ledger entry to the WARM backend.

Raises :class:AccountingPersistenceError on backend write failure, missing WARM backend, or unsupported backend so the runner can halt the cycle in live mode (VIB-3157). Paper/dry-run suppression is handled upstream by the runner.

Parameters:

Name Type Description Default
entry LedgerEntry

LedgerEntry to persist.

required

save_ledger_and_registry async

save_ledger_and_registry(
    *,
    ledger: LedgerEntry,
    registry: RegistryRow,
    handle: HandleMapping | None = None,
    mode: str = "commit",
) -> None

Atomic single-transaction commit of ledger + registry + handle.

Per blueprint 28 §4.1. Delegates to the SQLite backend's save_ledger_and_registry_atomic method which wraps all three writes in one BEGIN IMMEDIATE ... COMMIT. Idempotent on (deployment_id, chain, primitive, physical_identity_hash) with a strict monotone status-priority guard.

This is the runtime registry-mode write path. The function-level primitive at :func:almanak.framework.accounting.commit.save_ledger_and_registry validates inputs and dispatches here for mode='registry' calls; mode='accounting_only' callers use :meth:save_ledger_entry directly. Callers MUST go through one of those two surfaces — see tests/unit/state/test_position_registry_no_writers.py for the anti-bypass guard.

Failure contract: - :class:RegistryAutoCollisionError (auto-mode partial-unique-index violation, VIB-4200) propagates UNCHANGED — it is a programming-bug class distinct from :class:AccountingPersistenceError. The VIB-3762 paper-mode-leniency rule does NOT apply: collisions surface uniformly across live / paper / dry_run so the author's missing registry_handle cannot ship to live unnoticed. - Any other backend error (CHECK violation, OperationalError, ix_registry_handle violation, etc.) is wrapped as :class:AccountingPersistenceError with write_kind=ACCOUNTING so the runner's existing fail-closed pipeline (VIB-3157 / VIB-3762) handles it. The transaction is rolled back by the backend method before either exception propagates; no partial state lands on disk.

Parameters:

Name Type Description Default
ledger LedgerEntry

LedgerEntry for transaction_ledger.

required
registry RegistryRow

RegistryRow for position_registry.

required
handle HandleMapping | None

Optional HandleMapping (handle column on position_registry; no separate table per blueprint 28 §4.2). May also be encoded directly on registry.handle.

None

get_position_registry_open_rows async

get_position_registry_open_rows(
    deployment_id: str,
    *,
    chain: str | None = None,
    primitive: str | None = None,
    accounting_category: str | None = None,
) -> list[dict]

Return the OPEN position_registry rows for a deployment.

Backed by the WARM backend's typed read. Used by:

  • runner.get_open_lp_positions_from_registry (UniV3 LP path today; broadened by future cutovers).
  • Teardown's pre-flight ("what's open?") check for cutover-flipped primitives.

Audit M3 (CodeRabbit): on a backend that does not implement cutover storage (GatewayStateManager — see :class:CutoverStorageNotSupported), this method raises rather than silently returning []. A silent [] is indistinguishable from "fresh DB, no rows" — the boot guard would interpret the empty result as "registry is the source of truth and it is empty", potentially marking still-open positions as gone. The cutover boot guard catches this exception and chooses degrade vs hard refusal based on whether the cutover is meant to be active for this build.

SQLite and Postgres both implement this method (Postgres via VIB-4794, closing the in-process parity gap left by VIB-4205 which only delivered the gRPC RPC half).

find_open_auto_mode_registry_row async

find_open_auto_mode_registry_row(
    *,
    deployment_id: str,
    chain: str,
    accounting_category: str,
    semantic_grouping_key: str,
) -> dict[str, str] | None

Return the open auto-mode position_registry row colliding with a handle-less open for this semantic group, or None.

VIB-4614 single-source predicate: mirrors the ix_registry_auto_mode partial-unique-index WHERE status = 'open' AND handle IS NULL. Drives the pre-execution LP registry-collision preflight (reject BEFORE minting an orphan NFT) and shares the predicate with the post-mint commit-path classifier. Delegates to the WARM backend's typed read.

Raises :class:CutoverStorageNotSupported on backends that don't implement the typed read (e.g. GatewayStateManager) — see :meth:get_position_registry_open_rows for the silent-[] rationale. The preflight phase treats that exception as "cannot check, proceed" so a non-SQLite backend never blocks an open; the post-mint commit-path classifier remains the backstop there.

insert_position_registry_row_if_absent async

insert_position_registry_row_if_absent(*, row: Any) -> bool

Backfill insert (INSERT … ON CONFLICT DO NOTHING).

Idempotent under restart. Used by :class:almanak.framework.migration.BackfillReader. Raises :class:CutoverStorageNotSupported on backends that don't implement the typed write — see get_position_registry_open_rows for the rationale.

upsert_migration_state async

upsert_migration_state(
    *, deployment_id: str, primitive: str, cutover_key: str
) -> None

Idempotent insert of a baseline migration_state row.

Raises :class:CutoverStorageNotSupported on backends that don't implement migration_state — silent no-op would let the boot guard's read return None and trigger RegistryCutoverNotDeployedError even when the build's intent is "this backend doesn't support cutover storage yet".

get_migration_state async

get_migration_state(
    *, deployment_id: str, primitive: str, cutover_key: str
) -> Any | None

Return the parsed migration_state row, or None when missing.

Raises :class:CutoverStorageNotSupported on backends that don't implement migration_state. Returning None on an unsupported backend would be indistinguishable from "row not yet created", which the boot guard treats as RegistryCutoverNotDeployedError — wrong error class, wrong recovery path.

get_position_events_filtered async

get_position_events_filtered(
    *, deployment_id: str, position_types: frozenset[str]
) -> list[dict]

Read the deployment's position_events rows whose position_type is in the filter set.

Used by the backfill driver loop. Raises :class:CutoverStorageNotSupported on backends that don't implement the typed read — silent [] would let the backfill complete with zero synthesized rows on a deployment that actually has historical positions.

get_ledger_entries async

get_ledger_entries(
    deployment_id: str,
    since: datetime | None = None,
    intent_type: str | None = None,
    limit: int = 100,
    before: datetime | None = None,
) -> list

Query transaction ledger entries.

Parameters:

Name Type Description Default
deployment_id str

Strategy to query.

required
since datetime | None

Only entries after this timestamp.

None
intent_type str | None

Filter by intent type.

None
limit int

Maximum entries to return.

100
before datetime | None

Only entries strictly older than this timestamp (paginated trade-tape cursor). When set, the SQL filter runs at the backend rather than post-fetch in Python so callers can never receive a "newest N rows that don't match the cursor" empty page.

None

Returns:

Type Description
list

List of LedgerEntry objects, newest first.

get_ledger_quant_stats async

get_ledger_quant_stats(
    deployment_id: str,
) -> LedgerQuantStats

SQL-side ledger aggregates for the dashboard quant tiles (VIB-5059).

Mirrors the :meth:get_recent_snapshots delegation pattern: no WARM backend, an unsupported backend, or a failed read all degrade to the zero-valued :class:LedgerQuantStats — the same inputs the legacy dashboard load produced from an empty ledger list, so the tiles render the honest empty state rather than erroring.

get_ledger_anchor_candidates async

get_ledger_anchor_candidates(
    deployment_id: str, limit: int = 64, offset: int = 0
) -> list[LedgerEntry]

Oldest-first first-action anchor candidate rows (VIB-5059).

Delegates to the WARM backend's LIMIT-bounded projection (see :meth:SQLiteStore.get_ledger_anchor_candidates). Degrades to an empty list on no/unsupported backend or read failure — the caller's anchor walk then falls back to portfolio-metrics, exactly as the legacy path did when no ledger row carried pre-state.

sum_ledger_gas_usd async

sum_ledger_gas_usd(deployment_id: str) -> Decimal

Σ transaction_ledger.gas_usd for a deployment (VIB-4225 ACC-02).

Delegates to the WARM backend's aggregator. Returns Decimal("0") on no rows, no warm backend, or unsupported backend (the runner's _build_metrics_for_snapshot reads hasattr first; this fallback guards against an old backend that pre-dates the aggregator method). Raises :class:AccountingPersistenceError so the runner halts the cycle in live mode (VIB-3762 contract).

save_position_event async

save_position_event(event: PositionEvent) -> bool

Persist a PositionEvent (OPEN/CLOSE/COLLECT_FEES/SNAPSHOT).

save_accounting_event async

save_accounting_event(event: Any) -> bool

Persist a typed accounting event (LendingAccountingEvent, etc.) to the warm backend.

Delegates to the backend's save_accounting_event when supported (SQLiteStore). Returns False when the backend does not yet support accounting events (e.g. GatewayStateManager before the metrics-database migration).

has_accounting_event_backend

has_accounting_event_backend() -> bool

True iff a warm backend able to serve accounting events is wired.

Single source of truth for the structural guard that :meth:get_accounting_events_sync runs internally — the read consults THIS probe, so a capability check can never drift from the read it gates. Callers that must distinguish "backend structurally absent" (Empty ≠ Zero — UNMEASURED) from "backend present, no events" (measured zero) probe this BEFORE reading: a False here means an empty read is unmeasured, not zero — e.g. hosted before the metrics-database migration, or a runner with no warm store. This is exactly what the ALM-2766 / VIB-5173 teardown swap-back clamp needs to decide whether to fail closed and flag accounting_degraded.

Read-only and side-effect-free: does not touch the warm backend and never raises.

get_accounting_events_sync

get_accounting_events_sync(
    deployment_id: str, position_key: str | None = None
) -> list[dict]

Synchronous accounting event query — delegates to the warm backend.

Used by PortfolioValuer (synchronous) to enrich PositionValue with cost_basis_usd / unrealized_pnl_usd / realized_pnl_usd at snapshot time. Returns [] when no warm backend or the backend predates this method. No LIMIT is applied: accurate cost basis requires the full event history.

The structural guard is :meth:has_accounting_event_backend so the capability probe and this read can never diverge. The [] return contract is unchanged (PortfolioValuer and other sync consumers rely on it); callers that need the absent-vs-empty distinction probe first.

get_position_events_sync

get_position_events_sync(
    deployment_id: str,
    position_id: str | None = None,
    position_type: str | None = None,
    event_type: str | None = None,
) -> list[dict]

Synchronous position event query — delegates to the warm backend.

Used by PortfolioValuer (synchronous) to enrich LP/PERP PositionValue objects with cost_basis_usd at snapshot time by looking up the OPEN event. Returns [] when no warm backend or the backend predates this method.

has_first_snapshot_backend

has_first_snapshot_backend() -> bool

True iff a warm backend able to serve the earliest snapshot synchronously is wired (VIB-4394).

Structural guard for :meth:get_first_snapshot_sync, mirroring :meth:has_accounting_event_backend. False means the sync earliest- snapshot read is structurally unavailable (Empty ≠ Zero — UNMEASURED): e.g. the hosted GatewayStateManager exposes no first-snapshot reader, so the boot OPENING_BALANCE seed (VIB-4394) no-ops there rather than substituting empty inventory. Read-only; never raises.

get_first_snapshot_sync

get_first_snapshot_sync(
    deployment_id: str,
) -> PortfolioSnapshot | None

Synchronous earliest-snapshot query — delegates to the warm backend (VIB-4394).

Used by the boot FIFO reconstruction (_run_loop_helpers.reconstruct_lending_basis_store) to seed pre- existing wallet inventory as OPENING_BALANCE lots. Returns None when no warm backend supports it (probe with :meth:has_first_snapshot_backend first to distinguish "structurally absent" from "no snapshot yet"). The None return makes the seed a no-op rather than fabricating empty inventory — Empty ≠ Zero.

get_accounting_events_for_dashboard async

get_accounting_events_for_dashboard(
    deployment_id: str, position_key: str | None = None
) -> list[dict]

Async-context accounting event query for the dashboard service (VIB-3933).

Distinct from :meth:get_accounting_events_sync (which PortfolioValuer calls synchronously from inside the snapshot pipeline). The dashboard service is async and must not block the event loop on Postgres I/O, so it goes through this async sibling.

Dispatch
  • PostgresStore exposes get_accounting_events (async) — preferred.
  • SQLiteStore exposes get_accounting_events_sync (sync) — wrapped in run_in_executor so the local-mode path does not block the running event loop either.

Returns [] and emits a one-shot WARN if the backend supports neither (Phase 0 visibility for VIB-3933).

get_position_events_for_dashboard async

get_position_events_for_dashboard(
    deployment_id: str,
    position_id: str | None = None,
    position_type: str | None = None,
    event_type: str | None = None,
) -> list[dict]

Async-context position event query for the dashboard service (VIB-3933).

See :meth:get_accounting_events_for_dashboard for the dispatch rationale. PostgresStore exposes the async get_position_events_dict; SQLiteStore exposes the sync get_position_events_sync which we invoke through run_in_executor.

update_position_attribution async

update_position_attribution(
    event_id: str,
    attribution_json: str,
    attribution_version: int,
    deployment_id: str = "",
) -> bool

Partial update of attribution_json + attribution_version on a PositionEvent.

deployment_id is forwarded to the warm backend so the GSM client can pass it through to the gateway proto request as defense-in-depth wire-level scope. SQLite ignores it (UUID event_id is globally unique); see SQLiteStore.update_position_attribution for the rationale.

get_position_events async

get_position_events(
    deployment_id: str,
    event_type: str | None = None,
    limit: int = 100,
) -> list

Query position events for a strategy (newest-first).

Backend signature is (deployment_id, position_id, event_type, limit) — call with keyword args so positional binding can't silently bind event_type to position_id. CodeRabbit round-4 caught this: my round-3 forwarding was (deployment_id, event_type, limit) which mis-bound, producing empty results for every caller that passed event_type (e.g. recompute_attribution filtering CLOSE events).

get_position_history async

get_position_history(
    deployment_id: str, position_id: str
) -> list

Fetch full history (timestamp-ASC) for a single position_id.

StateManagerConfig

almanak.framework.state.StateManagerConfig dataclass

StateManagerConfig(
    enable_hot: bool = True,
    enable_warm: bool = True,
    warm_backend: WarmBackendType = WarmBackendType.POSTGRESQL,
    hot_cache_ttl_seconds: int = 0,
    hot_cache_max_size: int = 1000,
    database_url: str | None = None,
    postgres_config: PostgresConfig = PostgresConfig(),
    sqlite_config: SQLiteConfigLight = SQLiteConfigLight(),
    metrics_callback: Callable[[TierMetrics], None]
    | None = None,
    load_state_on_startup: bool = True,
)

Configuration for StateManager.

Attributes:

Name Type Description
enable_hot bool

Enable in-memory cache tier

enable_warm bool

Enable WARM tier (PostgreSQL or SQLite)

warm_backend WarmBackendType

Which backend to use for WARM tier (POSTGRESQL or SQLITE)

hot_cache_ttl_seconds int

TTL for hot cache entries (0 = no expiry)

hot_cache_max_size int

Maximum entries in hot cache

postgres_config PostgresConfig

PostgreSQL configuration (used when warm_backend=POSTGRESQL)

sqlite_config SQLiteConfigLight

SQLite configuration (used when warm_backend=SQLITE)

metrics_callback Callable[[TierMetrics], None] | None

Optional callback for metrics reporting

load_state_on_startup bool

Load all active states from WARM to HOT on startup

Example

PostgreSQL backend (default, production)

config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), )

SQLite backend (local development)

config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), )

StateTier

almanak.framework.state.StateTier

Bases: IntEnum

Storage tier for state data.

Ordered by access speed (fastest first).

StateData

almanak.framework.state.StateData dataclass

StateData(
    deployment_id: str = "",
    version: int = 0,
    state: dict[str, Any] | None = None,
    schema_version: int = 1,
    checksum: str = "",
    created_at: datetime | None = None,
    loaded_from: StateTier | None = None,
)

Strategy state data container.

Attributes:

Name Type Description
deployment_id str

Unique identifier for the strategy

version int

CAS version number (incremented on each update)

state dict[str, Any]

The actual state data as a dictionary

schema_version int

Schema version for migrations

checksum str

SHA-256 hash of state data for integrity verification

created_at datetime

When this state version was created

loaded_from StateTier | None

Which tier the state was loaded from

verify_checksum

verify_checksum() -> bool

Verify the integrity of state data.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

from_dict classmethod

from_dict(data: dict[str, Any]) -> StateData

Create StateData from dictionary.

Migrations

StateMigration

almanak.framework.state.StateMigration dataclass

StateMigration(
    version: int,
    migration_fn: MigrationFunction,
    description: str = "",
    rollback_safe_until_version: int = 1,
    created_at: datetime = (lambda: datetime.now(UTC))(),
)

Defines a single state migration.

Attributes:

Name Type Description
version int

The schema version this migration upgrades TO

migration_fn MigrationFunction

Function that transforms state from version-1 to version

description str

Human-readable description of what this migration does

rollback_safe_until_version int

Minimum version that can safely rollback to this version (i.e., versions >= this can rollback without data loss)

created_at datetime

When this migration was defined

__post_init__

__post_init__() -> None

Validate migration.

apply

apply(state: dict[str, Any]) -> dict[str, Any]

Apply this migration to state.

Creates a deep copy to avoid mutating original state.

Parameters:

Name Type Description Default
state dict[str, Any]

The state dict to migrate

required

Returns:

Type Description
dict[str, Any]

Migrated state dict (new copy)

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

MigrationRegistry

almanak.framework.state.MigrationRegistry

MigrationRegistry()

Registry of all state migrations.

Tracks migrations and provides version validation and lookup.

current_version property

current_version: int

Get the current (latest) schema version.

migrations property

migrations: dict[int, StateMigration]

Get all registered migrations.

register

register(migration: StateMigration) -> None

Register a migration.

Parameters:

Name Type Description Default
migration StateMigration

The migration to register

required

Raises:

Type Description
ValueError

If a migration for this version already exists

get

get(version: int) -> StateMigration | None

Get migration for a specific version.

Parameters:

Name Type Description Default
version int

Target version

required

Returns:

Type Description
StateMigration | None

StateMigration or None if not found

get_migrations_path

get_migrations_path(
    from_version: int, to_version: int
) -> list[StateMigration]

Get list of migrations needed to go from one version to another.

Parameters:

Name Type Description Default
from_version int

Starting version

required
to_version int

Target version

required

Returns:

Type Description
list[StateMigration]

List of migrations to apply (in order)

Raises:

Type Description
MigrationNotFoundError

If any required migration is missing

get_rollback_info

get_rollback_info(current_version: int) -> RollbackInfo

Get rollback safety information for a version.

Parameters:

Name Type Description Default
current_version int

Current schema version

required

Returns:

Type Description
RollbackInfo

RollbackInfo with safe and unsafe rollback targets

clear

clear() -> None

Clear all registered migrations (mainly for testing).

MigrationResult

almanak.framework.state.MigrationResult dataclass

MigrationResult(
    success: bool,
    from_version: int,
    to_version: int,
    migrations_applied: list[int],
    state: dict[str, Any],
    error: str | None = None,
    duration_ms: float = 0.0,
)

Result of applying migrations.

Attributes:

Name Type Description
success bool

Whether all migrations succeeded

from_version int

Starting schema version

to_version int

Ending schema version

migrations_applied list[int]

List of migration versions applied

state dict[str, Any]

The migrated state (or original if failed)

error str | None

Error message if migration failed

duration_ms float

Total migration time in milliseconds

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Position Management

PositionManager

almanak.framework.state.PositionManager

PositionManager(
    chains: list[str],
    initial_positions: list[PositionRecord] | None = None,
)

Manages positions across multiple chains with chain dimension support.

Provides methods to store, query, and aggregate positions across chains. Designed to work with StateData.state dictionary for persistence.

Usage

Create manager for a multi-chain strategy

manager = PositionManager(chains=['arbitrum', 'optimism', 'base'])

Add positions

manager.add_position(PositionRecord( position_id='pos-1', chain='arbitrum', position_type=PositionType.SUPPLY, protocol='aave_v3', token='WETH', amount=Decimal('1.5'), value_usd=Decimal('3000'), ))

Query positions

all_positions = manager.positions # All chains arb_positions = manager.positions_on('arbitrum') # Single chain total_usd = manager.total_value_usd # Aggregate value

Attributes:

Name Type Description
chains list[str]

List of configured chain names

Initialize position manager.

Parameters:

Name Type Description Default
chains list[str]

List of chain names this manager handles

required
initial_positions list[PositionRecord] | None

Optional list of positions to pre-populate

None

chains property

chains: list[str]

Get list of configured chains.

positions property

positions: list[PositionRecord]

Get all positions across all chains.

Returns:

Type Description
list[PositionRecord]

List of all positions from all configured chains.

total_value_usd property

total_value_usd: Decimal

Calculate total USD value across all chains.

Returns:

Type Description
Decimal

Sum of value_usd for all positions across all chains.

positions_on

positions_on(chain: str) -> list[PositionRecord]

Get positions on a specific chain.

Parameters:

Name Type Description Default
chain str

Chain name to filter by

required

Returns:

Type Description
list[PositionRecord]

List of positions on the specified chain

Raises:

Type Description
ChainNotFoundError

If chain is not configured

total_value_on

total_value_on(chain: str) -> Decimal

Calculate total USD value on a specific chain.

Parameters:

Name Type Description Default
chain str

Chain name to calculate value for

required

Returns:

Type Description
Decimal

Sum of value_usd for positions on the specified chain

Raises:

Type Description
ChainNotFoundError

If chain is not configured

add_position

add_position(position: PositionRecord) -> None

Add or update a position.

If a position with the same position_id exists on the same chain, it will be replaced.

Parameters:

Name Type Description Default
position PositionRecord

Position to add/update

required

Raises:

Type Description
ChainNotFoundError

If position's chain is not configured

remove_position

remove_position(position_id: str, chain: str) -> bool

Remove a position by ID and chain.

Parameters:

Name Type Description Default
position_id str

Position identifier

required
chain str

Chain the position is on

required

Returns:

Type Description
bool

True if position was removed, False if not found

Raises:

Type Description
ChainNotFoundError

If chain is not configured

get_position

get_position(
    position_id: str, chain: str
) -> PositionRecord | None

Get a specific position by ID and chain.

Parameters:

Name Type Description Default
position_id str

Position identifier

required
chain str

Chain the position is on

required

Returns:

Type Description
PositionRecord | None

PositionRecord if found, None otherwise

Raises:

Type Description
ChainNotFoundError

If chain is not configured

find_position

find_position(position_id: str) -> PositionRecord | None

Find a position by ID across all chains.

Parameters:

Name Type Description Default
position_id str

Position identifier to search for

required

Returns:

Type Description
PositionRecord | None

PositionRecord if found, None otherwise

positions_by_type

positions_by_type(
    position_type: PositionType,
) -> list[PositionRecord]

Get all positions of a specific type across all chains.

Parameters:

Name Type Description Default
position_type PositionType

Type of position to filter by

required

Returns:

Type Description
list[PositionRecord]

List of positions matching the type

positions_by_protocol

positions_by_protocol(
    protocol: str,
) -> list[PositionRecord]

Get all positions for a specific protocol across all chains.

Parameters:

Name Type Description Default
protocol str

Protocol name to filter by

required

Returns:

Type Description
list[PositionRecord]

List of positions on the protocol

clear

clear(chain: str | None = None) -> None

Clear positions.

Parameters:

Name Type Description Default
chain str | None

If provided, clear only positions on this chain. If None, clear all positions on all chains.

None

Raises:

Type Description
ChainNotFoundError

If specified chain is not configured

to_dict

to_dict() -> dict[str, Any]

Serialize all positions to dictionary for state storage.

Returns:

Type Description
dict[str, Any]

Dictionary with chain -> position_id -> position data structure

from_dict classmethod

from_dict(data: dict[str, Any]) -> PositionManager

Deserialize position manager from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary from to_dict()

required

Returns:

Type Description
PositionManager

PositionManager with restored positions

get_summary

get_summary() -> dict[str, Any]

Get a summary of positions across all chains.

Returns:

Type Description
dict[str, Any]

Dictionary with per-chain and total statistics

__repr__

__repr__() -> str

String representation of manager.

PositionRecord

almanak.framework.state.PositionRecord dataclass

PositionRecord(
    position_id: str,
    chain: str,
    position_type: PositionType,
    protocol: str | None,
    token: str,
    amount: Decimal,
    value_usd: Decimal,
    created_at: datetime = (lambda: datetime.now(UTC))(),
    updated_at: datetime = (lambda: datetime.now(UTC))(),
    metadata: dict[str, Any] = dict(),
)

A single position record with chain dimension.

This is the fundamental unit of position tracking. Every position must have an explicit chain field to support multi-chain strategies.

Attributes:

Name Type Description
position_id str

Unique identifier for this position

chain str

The blockchain this position is on (e.g., 'arbitrum', 'optimism')

position_type PositionType

Type of position (TOKEN, LP, BORROW, SUPPLY, PERP, VAULT)

protocol str | None

Protocol where position is held (e.g., 'aave_v3', 'uniswap_v3')

token str

Primary token symbol (or pool identifier for LP)

amount Decimal

Amount of tokens or liquidity

value_usd Decimal

Current USD value of the position

created_at datetime

When the position was opened

updated_at datetime

Last update timestamp

metadata dict[str, Any]

Additional position-specific data (health factor, ranges, etc.)

to_dict

to_dict() -> dict[str, Any]

Serialize position to dictionary.

from_dict classmethod

from_dict(data: dict[str, Any]) -> PositionRecord

Deserialize position from dictionary.

Exceptions

almanak.framework.state.StateConflictError

StateConflictError(
    deployment_id: str,
    expected_version: int,
    actual_version: int,
    message: str | None = None,
)

Bases: Exception

Raised when CAS update fails due to version mismatch.

This error indicates that another process has modified the state since it was last read. The caller should reload the state and retry.

almanak.framework.state.StateNotFoundError

StateNotFoundError(
    deployment_id: str, message: str | None = None
)

Bases: Exception

Raised when state is not found in any tier.