State Management¶
Three-tier state persistence for strategy data.
StateManager¶
almanak.framework.state.StateManager
¶
Tiered state manager with HOT and WARM storage tiers.
Provides: - <1ms access from HOT (in-memory) cache - <10ms access from WARM (PostgreSQL or SQLite) storage - CAS semantics for safe concurrent updates - Automatic tier fallback on load - Metrics tracking for each tier - Write-through from HOT to WARM tier
The WARM tier backend can be either PostgreSQL (production) or SQLite (development/lightweight). Backend selection is via configuration:
Usage
PostgreSQL backend (default, production)¶
config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), ) manager = StateManager(config) await manager.initialize()
SQLite backend (development)¶
config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), ) manager = StateManager(config) await manager.initialize()
Save state (writes to HOT then WARM)¶
state = StateData(deployment_id="strat-1", version=1, state={"key": "value"}) await manager.save_state(state)
Load state (reads from fastest available tier)¶
loaded = await manager.load_state("strat-1")
CAS update¶
loaded.state["key"] = "new_value" await manager.save_state(loaded, expected_version=loaded.version)
Dependency injection: provide custom backend¶
custom_sqlite = SQLiteStore(SQLiteConfig(db_path="./custom.db")) manager = StateManager(config, warm_backend=custom_sqlite)
Initialize StateManager.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
config
|
StateManagerConfig | None
|
Configuration for the state manager. Uses defaults if not provided. |
None
|
warm_backend
|
WarmStore | None
|
Optional pre-configured WARM tier backend. If provided, this backend is used instead of creating one from config. Useful for dependency injection and testing. |
None
|
warm_backend_type
property
¶
Get the type of WARM backend being used.
返回:
| 类型 | 描述 |
|---|---|
WarmBackendType | None
|
WarmBackendType.SQLITE, WarmBackendType.POSTGRESQL, or None if no WARM tier. |
warm_backend
property
¶
Get the WARM tier backend instance.
Useful for accessing backend-specific functionality like get_version_history() on SQLiteStore.
返回:
| 类型 | 描述 |
|---|---|
WarmStore | None
|
The WARM backend instance, or None if not initialized. |
initialize
async
¶
Initialize all enabled storage tiers.
If load_state_on_startup is enabled in config, loads all active states from WARM tier to HOT tier for fast access.
load_state
async
¶
Load state from the fastest available tier.
Tries tiers in order: HOT -> WARM. Populates HOT cache on WARM hit.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Deployment identifier |
必需 |
返回:
| 类型 | 描述 |
|---|---|
StateData
|
StateData from the fastest available tier |
引发:
| 类型 | 描述 |
|---|---|
StateNotFoundError
|
If state not found in any tier |
save_state
async
¶
Save state to all tiers.
Writes to WARM tier (source of truth) then updates HOT cache.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
state
|
StateData
|
State data to save |
必需 |
expected_version
|
int | None
|
Expected version for CAS update. If None and state has version > 1, uses state.version - 1. If None and state has version = 1, creates new state. |
None
|
返回:
| 类型 | 描述 |
|---|---|
StateData
|
Updated StateData with new version |
引发:
| 类型 | 描述 |
|---|---|
StateConflictError
|
If CAS update fails due to version mismatch |
delete_state
async
¶
Delete state from all tiers.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Deployment identifier |
必需 |
返回:
| 类型 | 描述 |
|---|---|
bool
|
True if state was deleted from at least one tier |
invalidate_hot_cache
¶
Invalidate HOT tier cache.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str | None
|
Specific strategy to invalidate, or None to clear all |
None
|
get_metrics
¶
Get recent tier metrics.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
limit
|
int
|
Maximum number of metrics to return |
100
|
返回:
| 类型 | 描述 |
|---|---|
list[TierMetrics]
|
List of TierMetrics, newest first |
get_metrics_summary
¶
Get summary of tier metrics.
返回:
| 类型 | 描述 |
|---|---|
dict[str, Any]
|
Dictionary with per-tier average latencies and success rates |
save_clob_order
async
¶
Save or update a CLOB order state.
Persists order state to the WARM tier for crash recovery and order tracking across strategy restarts.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
order
|
ClobOrderState
|
ClobOrderState to persist. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
bool
|
True if save succeeded, False if no WARM backend or error. |
get_clob_order
async
¶
Get a CLOB order by order_id.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
order_id
|
str
|
Order identifier. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
ClobOrderState | None
|
ClobOrderState if found, None otherwise. |
get_open_clob_orders
async
¶
Get all open CLOB orders, optionally filtered by market.
Open orders are those with status: pending, submitted, live, partially_filled.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
market_id
|
str | None
|
Optional market ID to filter by. |
None
|
返回:
| 类型 | 描述 |
|---|---|
list[ClobOrderState]
|
List of open ClobOrderState, newest first. |
update_clob_order_status
async
¶
update_clob_order_status(
order_id: str,
status: ClobOrderStatus,
fills: list[ClobFill] | None = None,
filled_size: str | None = None,
average_fill_price: str | None = None,
error: str | None = None,
*,
deployment_id: str,
) -> bool
Update the status and fill information of a CLOB order.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
order_id
|
str
|
Order identifier. |
必需 |
status
|
ClobOrderStatus
|
New order status. |
必需 |
fills
|
list[ClobFill] | None
|
Updated list of fills (replaces existing). |
None
|
filled_size
|
str | None
|
Updated filled size. |
None
|
average_fill_price
|
str | None
|
Updated average fill price. |
None
|
error
|
str | None
|
Error message if order failed. |
None
|
返回:
| 类型 | 描述 |
|---|---|
bool
|
True if order was found and updated. |
save_portfolio_snapshot
async
¶
Save a portfolio snapshot.
Persists portfolio value and position data for dashboard display and PnL tracking.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
snapshot
|
PortfolioSnapshot
|
PortfolioSnapshot to persist. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
int
|
Snapshot ID on success. Raises :class: |
int
|
on backend write failure, missing WARM backend, or unsupported |
int
|
backend so the runner can halt the cycle in live mode (VIB-3157). |
int
|
Paper/dry-run suppression is handled upstream by the runner. |
get_latest_snapshot
async
¶
Get most recent portfolio snapshot for a strategy.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Deployment identifier. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
PortfolioSnapshot | None
|
Latest PortfolioSnapshot if found, None otherwise. |
get_first_snapshot
async
¶
Get earliest portfolio snapshot for a strategy.
get_snapshots_since
async
¶
get_snapshots_since(
deployment_id: str, since: datetime, limit: int = 168
) -> list[PortfolioSnapshot]
Get portfolio snapshots since a timestamp (for charts).
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Deployment identifier. |
必需 |
since
|
datetime
|
Start timestamp for query. |
必需 |
limit
|
int
|
Maximum number of snapshots to return. |
168
|
返回:
| 类型 | 描述 |
|---|---|
list[PortfolioSnapshot]
|
List of PortfolioSnapshot, oldest first. |
get_recent_snapshots
async
¶
Get the limit most-recent portfolio snapshots, oldest-first.
Unlike :meth:get_snapshots_since (oldest-first from a since
anchor, for cursor-paginated charts), this returns the latest window
so a consumer reading [-1] always gets the true latest snapshot.
See VIB-5026: the PnL/quant-header loader paired ASC-from-since
with snapshots[-1] and silently surfaced the 168th-oldest row once
a deployment exceeded limit snapshots.
返回:
| 类型 | 描述 |
|---|---|
list[PortfolioSnapshot]
|
List of PortfolioSnapshot, oldest first. Empty list when no WARM |
list[PortfolioSnapshot]
|
backend is configured or the backend lacks the method. |
get_snapshots_in_window
async
¶
get_snapshots_in_window(
deployment_id: str,
from_ts: datetime | None,
to_ts: datetime | None,
*,
scan_cap: int = 200000,
) -> tuple[
list[
tuple[datetime, str | None, str | None, str | None]
],
bool,
]
Projected NAV samples inside a time window, for windowed charts (VIB-5059 P2).
Returns (rows, truncated) — see the backend method for the row shape.
Loud, not graceful. Unlike :meth:get_snapshots_since /
:meth:get_recent_snapshots (which swallow backend errors and return []
so the default dashboard render degrades quietly), the windowed path is
explicitly requested by the operator going back in time: a backend failure
or a missing backend is raised, not masked as an empty-but-OK series the
operator would read as "no history". The gateway maps the raised error to a
non-OK gRPC status. The metric is still recorded on the failure path.
get_nav_series
async
¶
get_nav_series(
deployment_id: str,
*,
since: tuple[datetime, int] | None = None,
scan_cap: int = 200000,
) -> tuple[
list[
tuple[
datetime,
str | None,
str | None,
int,
str | None,
]
],
bool,
]
NAV-component series for lifetime drawdown / high-watermark (VIB-5118/5134).
Returns (rows, truncated) — see the backend method for the row shape and
the two fetch modes. since=(last_ts, last_id) requests only the snapshots
newer than a cursor (the incremental-fold path); since=None is the full
history scan.
Graceful, not loud. Like :meth:get_recent_snapshots (and unlike the
operator-requested time-travel :meth:get_snapshots_in_window), a backend
failure or missing backend returns ([], False) rather than raising:
lifetime drawdown is a default header metric, and the caller degrades to
the recent-window drawdown when the full series is unavailable — better than
no drawdown at all. The failure metric is still recorded so the degrade is
observable.
get_snapshot_at
async
¶
Get snapshot closest to a timestamp (for PnL calculation).
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Deployment identifier. |
必需 |
timestamp
|
datetime
|
Target timestamp. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
PortfolioSnapshot | None
|
PortfolioSnapshot closest to timestamp, or None if not found. |
save_portfolio_metrics
async
¶
Save or update portfolio metrics.
Portfolio metrics store baseline values (initial_value_usd) that survive strategy restarts, enabling accurate PnL calculation.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
metrics
|
PortfolioMetrics
|
PortfolioMetrics to persist. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
bool
|
|
bool
|
on backend write failure, missing WARM backend, or unsupported |
bool
|
backend so the runner can halt the cycle in live mode (VIB-3157). |
bool
|
Paper/dry-run suppression is handled upstream by the runner. |
get_portfolio_metrics
async
¶
Get portfolio metrics for a strategy.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Deployment identifier. |
必需 |
返回:
| 类型 | 描述 |
|---|---|
PortfolioMetrics | None
|
PortfolioMetrics if found, None otherwise. |
cleanup_old_snapshots
async
¶
Clean up old portfolio snapshots.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
retention_days
|
int
|
Number of days of snapshots to retain. |
7
|
返回:
| 类型 | 描述 |
|---|---|
int
|
Number of snapshots deleted. |
save_ledger_entry
async
¶
Save a transaction ledger entry to the WARM backend.
Raises :class:AccountingPersistenceError on backend write failure,
missing WARM backend, or unsupported backend so the runner can halt
the cycle in live mode (VIB-3157). Paper/dry-run suppression is
handled upstream by the runner.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
entry
|
LedgerEntry
|
LedgerEntry to persist. |
必需 |
save_ledger_and_registry
async
¶
save_ledger_and_registry(
*,
ledger: LedgerEntry,
registry: RegistryRow,
handle: HandleMapping | None = None,
mode: str = "commit",
) -> None
Atomic single-transaction commit of ledger + registry + handle.
Per blueprint 28 §4.1. Delegates to the SQLite backend's
save_ledger_and_registry_atomic method which wraps all three
writes in one BEGIN IMMEDIATE ... COMMIT. Idempotent on
(deployment_id, chain, primitive, physical_identity_hash) with
a strict monotone status-priority guard.
This is the runtime registry-mode write path. The function-level
primitive at :func:almanak.framework.accounting.commit.save_ledger_and_registry
validates inputs and dispatches here for mode='registry' calls;
mode='accounting_only' callers use :meth:save_ledger_entry
directly. Callers MUST go through one of those two surfaces — see
tests/unit/state/test_position_registry_no_writers.py for the
anti-bypass guard.
Failure contract:
- :class:RegistryAutoCollisionError (auto-mode partial-unique-index
violation, VIB-4200) propagates UNCHANGED — it is a programming-bug
class distinct from :class:AccountingPersistenceError. The
VIB-3762 paper-mode-leniency rule does NOT apply: collisions
surface uniformly across live / paper / dry_run so the
author's missing registry_handle cannot ship to live unnoticed.
- Any other backend error (CHECK violation, OperationalError,
ix_registry_handle violation, etc.) is wrapped as
:class:AccountingPersistenceError with write_kind=ACCOUNTING
so the runner's existing fail-closed pipeline (VIB-3157 / VIB-3762)
handles it.
The transaction is rolled back by the backend method before either
exception propagates; no partial state lands on disk.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
ledger
|
LedgerEntry
|
|
必需 |
registry
|
RegistryRow
|
|
必需 |
handle
|
HandleMapping | None
|
Optional |
None
|
get_position_registry_open_rows
async
¶
get_position_registry_open_rows(
deployment_id: str,
*,
chain: str | None = None,
primitive: str | None = None,
accounting_category: str | None = None,
) -> list[dict]
Return the OPEN position_registry rows for a deployment.
Backed by the WARM backend's typed read. Used by:
runner.get_open_lp_positions_from_registry(UniV3 LP path today; broadened by future cutovers).- Teardown's pre-flight ("what's open?") check for cutover-flipped primitives.
Audit M3 (CodeRabbit): on a backend that does not implement
cutover storage (GatewayStateManager — see
:class:CutoverStorageNotSupported), this method raises rather
than silently returning []. A silent [] is indistinguishable
from "fresh DB, no rows" — the boot guard would interpret the
empty result as "registry is the source of truth and it is
empty", potentially marking still-open positions as gone. The
cutover boot guard catches this exception and chooses degrade
vs hard refusal based on whether the cutover is meant to be
active for this build.
SQLite and Postgres both implement this method (Postgres via VIB-4794, closing the in-process parity gap left by VIB-4205 which only delivered the gRPC RPC half).
find_open_auto_mode_registry_row
async
¶
find_open_auto_mode_registry_row(
*,
deployment_id: str,
chain: str,
accounting_category: str,
semantic_grouping_key: str,
) -> dict[str, str] | None
Return the open auto-mode position_registry row colliding with a
handle-less open for this semantic group, or None.
VIB-4614 single-source predicate: mirrors the ix_registry_auto_mode
partial-unique-index WHERE status = 'open' AND handle IS NULL. Drives
the pre-execution LP registry-collision preflight (reject BEFORE minting
an orphan NFT) and shares the predicate with the post-mint commit-path
classifier. Delegates to the WARM backend's typed read.
Raises :class:CutoverStorageNotSupported on backends that don't
implement the typed read (e.g. GatewayStateManager) — see
:meth:get_position_registry_open_rows for the silent-[] rationale.
The preflight phase treats that exception as "cannot check, proceed"
so a non-SQLite backend never blocks an open; the post-mint
commit-path classifier remains the backstop there.
insert_position_registry_row_if_absent
async
¶
Backfill insert (INSERT … ON CONFLICT DO NOTHING).
Idempotent under restart. Used by
:class:almanak.framework.migration.BackfillReader. Raises
:class:CutoverStorageNotSupported on backends that don't
implement the typed write — see get_position_registry_open_rows
for the rationale.
upsert_migration_state
async
¶
Idempotent insert of a baseline migration_state row.
Raises :class:CutoverStorageNotSupported on backends that
don't implement migration_state — silent no-op would let the
boot guard's read return None and trigger
RegistryCutoverNotDeployedError even when the build's
intent is "this backend doesn't support cutover storage yet".
get_migration_state
async
¶
Return the parsed migration_state row, or None when missing.
Raises :class:CutoverStorageNotSupported on backends that
don't implement migration_state. Returning None on an
unsupported backend would be indistinguishable from "row not
yet created", which the boot guard treats as
RegistryCutoverNotDeployedError — wrong error class, wrong
recovery path.
get_position_events_filtered
async
¶
Read the deployment's position_events rows whose
position_type is in the filter set.
Used by the backfill driver loop. Raises
:class:CutoverStorageNotSupported on backends that don't
implement the typed read — silent [] would let the
backfill complete with zero synthesized rows on a deployment
that actually has historical positions.
get_ledger_entries
async
¶
get_ledger_entries(
deployment_id: str,
since: datetime | None = None,
intent_type: str | None = None,
limit: int = 100,
before: datetime | None = None,
) -> list
Query transaction ledger entries.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
deployment_id
|
str
|
Strategy to query. |
必需 |
since
|
datetime | None
|
Only entries after this timestamp. |
None
|
intent_type
|
str | None
|
Filter by intent type. |
None
|
limit
|
int
|
Maximum entries to return. |
100
|
before
|
datetime | None
|
Only entries strictly older than this timestamp (paginated trade-tape cursor). When set, the SQL filter runs at the backend rather than post-fetch in Python so callers can never receive a "newest N rows that don't match the cursor" empty page. |
None
|
返回:
| 类型 | 描述 |
|---|---|
list
|
List of LedgerEntry objects, newest first. |
get_ledger_quant_stats
async
¶
SQL-side ledger aggregates for the dashboard quant tiles (VIB-5059).
Mirrors the :meth:get_recent_snapshots delegation pattern: no WARM
backend, an unsupported backend, or a failed read all degrade to the
zero-valued :class:LedgerQuantStats — the same inputs the legacy
dashboard load produced from an empty ledger list, so the tiles
render the honest empty state rather than erroring.
get_ledger_anchor_candidates
async
¶
get_ledger_anchor_candidates(
deployment_id: str, limit: int = 64, offset: int = 0
) -> list[LedgerEntry]
Oldest-first first-action anchor candidate rows (VIB-5059).
Delegates to the WARM backend's LIMIT-bounded projection (see
:meth:SQLiteStore.get_ledger_anchor_candidates). Degrades to an
empty list on no/unsupported backend or read failure — the caller's
anchor walk then falls back to portfolio-metrics, exactly as the
legacy path did when no ledger row carried pre-state.
sum_ledger_gas_usd
async
¶
Σ transaction_ledger.gas_usd for a deployment (VIB-4225 ACC-02).
Delegates to the WARM backend's aggregator. Returns Decimal("0")
on no rows, no warm backend, or unsupported backend (the runner's
_build_metrics_for_snapshot reads hasattr first; this fallback
guards against an old backend that pre-dates the aggregator method).
Raises :class:AccountingPersistenceError so the runner halts the
cycle in live mode (VIB-3762 contract).
save_position_event
async
¶
Persist a PositionEvent (OPEN/CLOSE/COLLECT_FEES/SNAPSHOT).
save_accounting_event
async
¶
Persist a typed accounting event (LendingAccountingEvent, etc.) to the warm backend.
Delegates to the backend's save_accounting_event when supported (SQLiteStore). Returns False when the backend does not yet support accounting events (e.g. GatewayStateManager before the metrics-database migration).
has_accounting_event_backend
¶
True iff a warm backend able to serve accounting events is wired.
Single source of truth for the structural guard that
:meth:get_accounting_events_sync runs internally — the read consults
THIS probe, so a capability check can never drift from the read it
gates. Callers that must distinguish "backend structurally absent"
(Empty ≠ Zero — UNMEASURED) from "backend present, no events"
(measured zero) probe this BEFORE reading: a False here means an
empty read is unmeasured, not zero — e.g. hosted before the
metrics-database migration, or a runner with no warm store. This is
exactly what the ALM-2766 / VIB-5173 teardown swap-back clamp needs to
decide whether to fail closed and flag accounting_degraded.
Read-only and side-effect-free: does not touch the warm backend and never raises.
get_accounting_events_sync
¶
Synchronous accounting event query — delegates to the warm backend.
Used by PortfolioValuer (synchronous) to enrich PositionValue with cost_basis_usd / unrealized_pnl_usd / realized_pnl_usd at snapshot time. Returns [] when no warm backend or the backend predates this method. No LIMIT is applied: accurate cost basis requires the full event history.
The structural guard is :meth:has_accounting_event_backend so the
capability probe and this read can never diverge. The [] return
contract is unchanged (PortfolioValuer and other sync consumers rely on
it); callers that need the absent-vs-empty distinction probe first.
get_position_events_sync
¶
get_position_events_sync(
deployment_id: str,
position_id: str | None = None,
position_type: str | None = None,
event_type: str | None = None,
) -> list[dict]
Synchronous position event query — delegates to the warm backend.
Used by PortfolioValuer (synchronous) to enrich LP/PERP PositionValue objects with cost_basis_usd at snapshot time by looking up the OPEN event. Returns [] when no warm backend or the backend predates this method.
has_first_snapshot_backend
¶
True iff a warm backend able to serve the earliest snapshot synchronously is wired (VIB-4394).
Structural guard for :meth:get_first_snapshot_sync, mirroring
:meth:has_accounting_event_backend. False means the sync earliest-
snapshot read is structurally unavailable (Empty ≠ Zero — UNMEASURED):
e.g. the hosted GatewayStateManager exposes no first-snapshot reader,
so the boot OPENING_BALANCE seed (VIB-4394) no-ops there rather than
substituting empty inventory. Read-only; never raises.
get_first_snapshot_sync
¶
Synchronous earliest-snapshot query — delegates to the warm backend (VIB-4394).
Used by the boot FIFO reconstruction
(_run_loop_helpers.reconstruct_lending_basis_store) to seed pre-
existing wallet inventory as OPENING_BALANCE lots. Returns None when no
warm backend supports it (probe with :meth:has_first_snapshot_backend
first to distinguish "structurally absent" from "no snapshot yet"). The
None return makes the seed a no-op rather than fabricating empty
inventory — Empty ≠ Zero.
get_accounting_events_for_dashboard
async
¶
get_accounting_events_for_dashboard(
deployment_id: str, position_key: str | None = None
) -> list[dict]
Async-context accounting event query for the dashboard service (VIB-3933).
Distinct from :meth:get_accounting_events_sync (which PortfolioValuer
calls synchronously from inside the snapshot pipeline). The dashboard
service is async and must not block the event loop on Postgres I/O,
so it goes through this async sibling.
Dispatch
- PostgresStore exposes
get_accounting_events(async) — preferred. - SQLiteStore exposes
get_accounting_events_sync(sync) — wrapped inrun_in_executorso the local-mode path does not block the running event loop either.
Returns [] and emits a one-shot WARN if the backend supports
neither (Phase 0 visibility for VIB-3933).
get_position_events_for_dashboard
async
¶
get_position_events_for_dashboard(
deployment_id: str,
position_id: str | None = None,
position_type: str | None = None,
event_type: str | None = None,
) -> list[dict]
Async-context position event query for the dashboard service (VIB-3933).
See :meth:get_accounting_events_for_dashboard for the dispatch
rationale. PostgresStore exposes the async get_position_events_dict;
SQLiteStore exposes the sync get_position_events_sync which we
invoke through run_in_executor.
update_position_attribution
async
¶
update_position_attribution(
event_id: str,
attribution_json: str,
attribution_version: int,
deployment_id: str = "",
) -> bool
Partial update of attribution_json + attribution_version on a PositionEvent.
deployment_id is forwarded to the warm backend so the GSM client
can pass it through to the gateway proto request as defense-in-depth
wire-level scope. SQLite ignores it (UUID event_id is globally
unique); see SQLiteStore.update_position_attribution for the
rationale.
get_position_events
async
¶
Query position events for a strategy (newest-first).
Backend signature is (deployment_id, position_id, event_type, limit)
— call with keyword args so positional binding can't silently bind
event_type to position_id. CodeRabbit round-4 caught this:
my round-3 forwarding was (deployment_id, event_type, limit) which
mis-bound, producing empty results for every caller that passed
event_type (e.g. recompute_attribution filtering CLOSE events).
get_position_history
async
¶
Fetch full history (timestamp-ASC) for a single position_id.
StateManagerConfig¶
almanak.framework.state.StateManagerConfig
dataclass
¶
StateManagerConfig(
enable_hot: bool = True,
enable_warm: bool = True,
warm_backend: WarmBackendType = WarmBackendType.POSTGRESQL,
hot_cache_ttl_seconds: int = 0,
hot_cache_max_size: int = 1000,
database_url: str | None = None,
postgres_config: PostgresConfig = PostgresConfig(),
sqlite_config: SQLiteConfigLight = SQLiteConfigLight(),
metrics_callback: Callable[[TierMetrics], None]
| None = None,
load_state_on_startup: bool = True,
)
Configuration for StateManager.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
enable_hot |
bool
|
Enable in-memory cache tier |
enable_warm |
bool
|
Enable WARM tier (PostgreSQL or SQLite) |
warm_backend |
WarmBackendType
|
Which backend to use for WARM tier (POSTGRESQL or SQLITE) |
hot_cache_ttl_seconds |
int
|
TTL for hot cache entries (0 = no expiry) |
hot_cache_max_size |
int
|
Maximum entries in hot cache |
postgres_config |
PostgresConfig
|
PostgreSQL configuration (used when warm_backend=POSTGRESQL) |
sqlite_config |
SQLiteConfigLight
|
SQLite configuration (used when warm_backend=SQLITE) |
metrics_callback |
Callable[[TierMetrics], None] | None
|
Optional callback for metrics reporting |
load_state_on_startup |
bool
|
Load all active states from WARM to HOT on startup |
Example
PostgreSQL backend (default, production)¶
config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), )
SQLite backend (local development)¶
config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), )
StateTier¶
almanak.framework.state.StateTier
¶
Bases: IntEnum
Storage tier for state data.
Ordered by access speed (fastest first).
StateData¶
almanak.framework.state.StateData
dataclass
¶
StateData(
deployment_id: str = "",
version: int = 0,
state: dict[str, Any] | None = None,
schema_version: int = 1,
checksum: str = "",
created_at: datetime | None = None,
loaded_from: StateTier | None = None,
)
Strategy state data container.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
deployment_id |
str
|
Unique identifier for the strategy |
version |
int
|
CAS version number (incremented on each update) |
state |
dict[str, Any]
|
The actual state data as a dictionary |
schema_version |
int
|
Schema version for migrations |
checksum |
str
|
SHA-256 hash of state data for integrity verification |
created_at |
datetime
|
When this state version was created |
loaded_from |
StateTier | None
|
Which tier the state was loaded from |
Migrations¶
StateMigration¶
almanak.framework.state.StateMigration
dataclass
¶
StateMigration(
version: int,
migration_fn: MigrationFunction,
description: str = "",
rollback_safe_until_version: int = 1,
created_at: datetime = (lambda: datetime.now(UTC))(),
)
Defines a single state migration.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
version |
int
|
The schema version this migration upgrades TO |
migration_fn |
MigrationFunction
|
Function that transforms state from version-1 to version |
description |
str
|
Human-readable description of what this migration does |
rollback_safe_until_version |
int
|
Minimum version that can safely rollback to this version (i.e., versions >= this can rollback without data loss) |
created_at |
datetime
|
When this migration was defined |
apply
¶
Apply this migration to state.
Creates a deep copy to avoid mutating original state.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
state
|
dict[str, Any]
|
The state dict to migrate |
必需 |
返回:
| 类型 | 描述 |
|---|---|
dict[str, Any]
|
Migrated state dict (new copy) |
MigrationRegistry¶
almanak.framework.state.MigrationRegistry
¶
Registry of all state migrations.
Tracks migrations and provides version validation and lookup.
register
¶
Register a migration.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
migration
|
StateMigration
|
The migration to register |
必需 |
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If a migration for this version already exists |
get
¶
Get migration for a specific version.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
version
|
int
|
Target version |
必需 |
返回:
| 类型 | 描述 |
|---|---|
StateMigration | None
|
StateMigration or None if not found |
get_migrations_path
¶
Get list of migrations needed to go from one version to another.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
from_version
|
int
|
Starting version |
必需 |
to_version
|
int
|
Target version |
必需 |
返回:
| 类型 | 描述 |
|---|---|
list[StateMigration]
|
List of migrations to apply (in order) |
引发:
| 类型 | 描述 |
|---|---|
MigrationNotFoundError
|
If any required migration is missing |
get_rollback_info
¶
Get rollback safety information for a version.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
current_version
|
int
|
Current schema version |
必需 |
返回:
| 类型 | 描述 |
|---|---|
RollbackInfo
|
RollbackInfo with safe and unsafe rollback targets |
MigrationResult¶
almanak.framework.state.MigrationResult
dataclass
¶
MigrationResult(
success: bool,
from_version: int,
to_version: int,
migrations_applied: list[int],
state: dict[str, Any],
error: str | None = None,
duration_ms: float = 0.0,
)
Result of applying migrations.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
success |
bool
|
Whether all migrations succeeded |
from_version |
int
|
Starting schema version |
to_version |
int
|
Ending schema version |
migrations_applied |
list[int]
|
List of migration versions applied |
state |
dict[str, Any]
|
The migrated state (or original if failed) |
error |
str | None
|
Error message if migration failed |
duration_ms |
float
|
Total migration time in milliseconds |
Position Management¶
PositionManager¶
almanak.framework.state.PositionManager
¶
Manages positions across multiple chains with chain dimension support.
Provides methods to store, query, and aggregate positions across chains. Designed to work with StateData.state dictionary for persistence.
Usage
Create manager for a multi-chain strategy¶
manager = PositionManager(chains=['arbitrum', 'optimism', 'base'])
Add positions¶
manager.add_position(PositionRecord( position_id='pos-1', chain='arbitrum', position_type=PositionType.SUPPLY, protocol='aave_v3', token='WETH', amount=Decimal('1.5'), value_usd=Decimal('3000'), ))
Query positions¶
all_positions = manager.positions # All chains arb_positions = manager.positions_on('arbitrum') # Single chain total_usd = manager.total_value_usd # Aggregate value
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
chains |
list[str]
|
List of configured chain names |
Initialize position manager.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chains
|
list[str]
|
List of chain names this manager handles |
必需 |
initial_positions
|
list[PositionRecord] | None
|
Optional list of positions to pre-populate |
None
|
positions
property
¶
Get all positions across all chains.
返回:
| 类型 | 描述 |
|---|---|
list[PositionRecord]
|
List of all positions from all configured chains. |
total_value_usd
property
¶
Calculate total USD value across all chains.
返回:
| 类型 | 描述 |
|---|---|
Decimal
|
Sum of value_usd for all positions across all chains. |
positions_on
¶
Get positions on a specific chain.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chain
|
str
|
Chain name to filter by |
必需 |
返回:
| 类型 | 描述 |
|---|---|
list[PositionRecord]
|
List of positions on the specified chain |
引发:
| 类型 | 描述 |
|---|---|
ChainNotFoundError
|
If chain is not configured |
total_value_on
¶
Calculate total USD value on a specific chain.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chain
|
str
|
Chain name to calculate value for |
必需 |
返回:
| 类型 | 描述 |
|---|---|
Decimal
|
Sum of value_usd for positions on the specified chain |
引发:
| 类型 | 描述 |
|---|---|
ChainNotFoundError
|
If chain is not configured |
add_position
¶
Add or update a position.
If a position with the same position_id exists on the same chain, it will be replaced.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
position
|
PositionRecord
|
Position to add/update |
必需 |
引发:
| 类型 | 描述 |
|---|---|
ChainNotFoundError
|
If position's chain is not configured |
remove_position
¶
Remove a position by ID and chain.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
position_id
|
str
|
Position identifier |
必需 |
chain
|
str
|
Chain the position is on |
必需 |
返回:
| 类型 | 描述 |
|---|---|
bool
|
True if position was removed, False if not found |
引发:
| 类型 | 描述 |
|---|---|
ChainNotFoundError
|
If chain is not configured |
get_position
¶
Get a specific position by ID and chain.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
position_id
|
str
|
Position identifier |
必需 |
chain
|
str
|
Chain the position is on |
必需 |
返回:
| 类型 | 描述 |
|---|---|
PositionRecord | None
|
PositionRecord if found, None otherwise |
引发:
| 类型 | 描述 |
|---|---|
ChainNotFoundError
|
If chain is not configured |
find_position
¶
Find a position by ID across all chains.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
position_id
|
str
|
Position identifier to search for |
必需 |
返回:
| 类型 | 描述 |
|---|---|
PositionRecord | None
|
PositionRecord if found, None otherwise |
positions_by_type
¶
Get all positions of a specific type across all chains.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
position_type
|
PositionType
|
Type of position to filter by |
必需 |
返回:
| 类型 | 描述 |
|---|---|
list[PositionRecord]
|
List of positions matching the type |
positions_by_protocol
¶
Get all positions for a specific protocol across all chains.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
protocol
|
str
|
Protocol name to filter by |
必需 |
返回:
| 类型 | 描述 |
|---|---|
list[PositionRecord]
|
List of positions on the protocol |
clear
¶
Clear positions.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chain
|
str | None
|
If provided, clear only positions on this chain. If None, clear all positions on all chains. |
None
|
引发:
| 类型 | 描述 |
|---|---|
ChainNotFoundError
|
If specified chain is not configured |
to_dict
¶
Serialize all positions to dictionary for state storage.
返回:
| 类型 | 描述 |
|---|---|
dict[str, Any]
|
Dictionary with chain -> position_id -> position data structure |
from_dict
classmethod
¶
Deserialize position manager from dictionary.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary from to_dict() |
必需 |
返回:
| 类型 | 描述 |
|---|---|
PositionManager
|
PositionManager with restored positions |
get_summary
¶
Get a summary of positions across all chains.
返回:
| 类型 | 描述 |
|---|---|
dict[str, Any]
|
Dictionary with per-chain and total statistics |
PositionRecord¶
almanak.framework.state.PositionRecord
dataclass
¶
PositionRecord(
position_id: str,
chain: str,
position_type: PositionType,
protocol: str | None,
token: str,
amount: Decimal,
value_usd: Decimal,
created_at: datetime = (lambda: datetime.now(UTC))(),
updated_at: datetime = (lambda: datetime.now(UTC))(),
metadata: dict[str, Any] = dict(),
)
A single position record with chain dimension.
This is the fundamental unit of position tracking. Every position must have an explicit chain field to support multi-chain strategies.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
position_id |
str
|
Unique identifier for this position |
chain |
str
|
The blockchain this position is on (e.g., 'arbitrum', 'optimism') |
position_type |
PositionType
|
Type of position (TOKEN, LP, BORROW, SUPPLY, PERP, VAULT) |
protocol |
str | None
|
Protocol where position is held (e.g., 'aave_v3', 'uniswap_v3') |
token |
str
|
Primary token symbol (or pool identifier for LP) |
amount |
Decimal
|
Amount of tokens or liquidity |
value_usd |
Decimal
|
Current USD value of the position |
created_at |
datetime
|
When the position was opened |
updated_at |
datetime
|
Last update timestamp |
metadata |
dict[str, Any]
|
Additional position-specific data (health factor, ranges, etc.) |
Exceptions¶
almanak.framework.state.StateConflictError
¶
StateConflictError(
deployment_id: str,
expected_version: int,
actual_version: int,
message: str | None = None,
)
Bases: Exception
Raised when CAS update fails due to version mismatch.
This error indicates that another process has modified the state since it was last read. The caller should reload the state and retry.
almanak.framework.state.StateNotFoundError
¶
Bases: Exception
Raised when state is not found in any tier.