State Management¶
Three-tier state persistence for strategy data.
StateManager¶
almanak.framework.state.StateManager
¶
Tiered state manager with HOT and WARM storage tiers.
Provides: - <1ms access from HOT (in-memory) cache - <10ms access from WARM (PostgreSQL or SQLite) storage - CAS semantics for safe concurrent updates - Automatic tier fallback on load - Metrics tracking for each tier - Write-through from HOT to WARM tier
The WARM tier backend can be either PostgreSQL (production) or SQLite (development/lightweight). Backend selection is via configuration:
Usage
PostgreSQL backend (default, production)¶
config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), ) manager = StateManager(config) await manager.initialize()
SQLite backend (development)¶
config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), ) manager = StateManager(config) await manager.initialize()
Save state (writes to HOT then WARM)¶
state = StateData(strategy_id="strat-1", version=1, state={"key": "value"}) await manager.save_state(state)
Load state (reads from fastest available tier)¶
loaded = await manager.load_state("strat-1")
CAS update¶
loaded.state["key"] = "new_value" await manager.save_state(loaded, expected_version=loaded.version)
Dependency injection: provide custom backend¶
custom_sqlite = SQLiteStore(SQLiteConfig(db_path="./custom.db")) manager = StateManager(config, warm_backend=custom_sqlite)
Initialize StateManager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
StateManagerConfig | None
|
Configuration for the state manager. Uses defaults if not provided. |
None
|
warm_backend
|
WarmStore | None
|
Optional pre-configured WARM tier backend. If provided, this backend is used instead of creating one from config. Useful for dependency injection and testing. |
None
|
warm_backend_type
property
¶
Get the type of WARM backend being used.
Returns:
| Type | Description |
|---|---|
WarmBackendType | None
|
WarmBackendType.SQLITE, WarmBackendType.POSTGRESQL, or None if no WARM tier. |
warm_backend
property
¶
Get the WARM tier backend instance.
Useful for accessing backend-specific functionality like get_version_history() on SQLiteStore.
Returns:
| Type | Description |
|---|---|
WarmStore | None
|
The WARM backend instance, or None if not initialized. |
initialize
async
¶
Initialize all enabled storage tiers.
If load_state_on_startup is enabled in config, loads all active states from WARM tier to HOT tier for fast access.
load_state
async
¶
Load state from the fastest available tier.
Tries tiers in order: HOT -> WARM. Populates HOT cache on WARM hit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier |
required |
Returns:
| Type | Description |
|---|---|
StateData
|
StateData from the fastest available tier |
Raises:
| Type | Description |
|---|---|
StateNotFoundError
|
If state not found in any tier |
save_state
async
¶
Save state to all tiers.
Writes to WARM tier (source of truth) then updates HOT cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
StateData
|
State data to save |
required |
expected_version
|
int | None
|
Expected version for CAS update. If None and state has version > 1, uses state.version - 1. If None and state has version = 1, creates new state. |
None
|
Returns:
| Type | Description |
|---|---|
StateData
|
Updated StateData with new version |
Raises:
| Type | Description |
|---|---|
StateConflictError
|
If CAS update fails due to version mismatch |
delete_state
async
¶
Delete state from all tiers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if state was deleted from at least one tier |
invalidate_hot_cache
¶
Invalidate HOT tier cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str | None
|
Specific strategy to invalidate, or None to clear all |
None
|
get_metrics
¶
Get recent tier metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of metrics to return |
100
|
Returns:
| Type | Description |
|---|---|
list[TierMetrics]
|
List of TierMetrics, newest first |
get_metrics_summary
¶
Get summary of tier metrics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with per-tier average latencies and success rates |
save_clob_order
async
¶
Save or update a CLOB order state.
Persists order state to the WARM tier for crash recovery and order tracking across strategy restarts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order
|
ClobOrderState
|
ClobOrderState to persist. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if save succeeded, False if no WARM backend or error. |
get_clob_order
async
¶
Get a CLOB order by order_id.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
Order identifier. |
required |
Returns:
| Type | Description |
|---|---|
ClobOrderState | None
|
ClobOrderState if found, None otherwise. |
get_open_clob_orders
async
¶
Get all open CLOB orders, optionally filtered by market.
Open orders are those with status: pending, submitted, live, partially_filled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str | None
|
Optional market ID to filter by. |
None
|
Returns:
| Type | Description |
|---|---|
list[ClobOrderState]
|
List of open ClobOrderState, newest first. |
update_clob_order_status
async
¶
update_clob_order_status(
order_id: str,
status: ClobOrderStatus,
fills: list[ClobFill] | None = None,
filled_size: str | None = None,
average_fill_price: str | None = None,
error: str | None = None,
) -> bool
Update the status and fill information of a CLOB order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
order_id
|
str
|
Order identifier. |
required |
status
|
ClobOrderStatus
|
New order status. |
required |
fills
|
list[ClobFill] | None
|
Updated list of fills (replaces existing). |
None
|
filled_size
|
str | None
|
Updated filled size. |
None
|
average_fill_price
|
str | None
|
Updated average fill price. |
None
|
error
|
str | None
|
Error message if order failed. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if order was found and updated. |
save_portfolio_snapshot
async
¶
Save a portfolio snapshot.
Persists portfolio value and position data for dashboard display and PnL tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot
|
PortfolioSnapshot
|
PortfolioSnapshot to persist. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Snapshot ID if save succeeded, 0 if no WARM backend or error. |
get_latest_snapshot
async
¶
Get most recent portfolio snapshot for a strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier. |
required |
Returns:
| Type | Description |
|---|---|
PortfolioSnapshot | None
|
Latest PortfolioSnapshot if found, None otherwise. |
get_snapshots_since
async
¶
get_snapshots_since(
strategy_id: str, since: datetime, limit: int = 168
) -> list[PortfolioSnapshot]
Get portfolio snapshots since a timestamp (for charts).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier. |
required |
since
|
datetime
|
Start timestamp for query. |
required |
limit
|
int
|
Maximum number of snapshots to return. |
168
|
Returns:
| Type | Description |
|---|---|
list[PortfolioSnapshot]
|
List of PortfolioSnapshot, oldest first. |
get_snapshot_at
async
¶
Get snapshot closest to a timestamp (for PnL calculation).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier. |
required |
timestamp
|
datetime
|
Target timestamp. |
required |
Returns:
| Type | Description |
|---|---|
PortfolioSnapshot | None
|
PortfolioSnapshot closest to timestamp, or None if not found. |
save_portfolio_metrics
async
¶
Save or update portfolio metrics.
Portfolio metrics store baseline values (initial_value_usd) that survive strategy restarts, enabling accurate PnL calculation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
PortfolioMetrics
|
PortfolioMetrics to persist. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if save succeeded, False if no WARM backend or error. |
get_portfolio_metrics
async
¶
Get portfolio metrics for a strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier. |
required |
Returns:
| Type | Description |
|---|---|
PortfolioMetrics | None
|
PortfolioMetrics if found, None otherwise. |
cleanup_old_snapshots
async
¶
Clean up old portfolio snapshots.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
retention_days
|
int
|
Number of days of snapshots to retain. |
7
|
Returns:
| Type | Description |
|---|---|
int
|
Number of snapshots deleted. |
StateManagerConfig¶
almanak.framework.state.StateManagerConfig
dataclass
¶
StateManagerConfig(
enable_hot: bool = True,
enable_warm: bool = True,
warm_backend: WarmBackendType = WarmBackendType.POSTGRESQL,
hot_cache_ttl_seconds: int = 0,
hot_cache_max_size: int = 1000,
postgres_config: PostgresConfig = PostgresConfig(),
sqlite_config: SQLiteConfigLight = SQLiteConfigLight(),
metrics_callback: Callable[[TierMetrics], None]
| None = None,
load_state_on_startup: bool = True,
)
Configuration for StateManager.
Attributes:
| Name | Type | Description |
|---|---|---|
enable_hot |
bool
|
Enable in-memory cache tier |
enable_warm |
bool
|
Enable WARM tier (PostgreSQL or SQLite) |
warm_backend |
WarmBackendType
|
Which backend to use for WARM tier (POSTGRESQL or SQLITE) |
hot_cache_ttl_seconds |
int
|
TTL for hot cache entries (0 = no expiry) |
hot_cache_max_size |
int
|
Maximum entries in hot cache |
postgres_config |
PostgresConfig
|
PostgreSQL configuration (used when warm_backend=POSTGRESQL) |
sqlite_config |
SQLiteConfigLight
|
SQLite configuration (used when warm_backend=SQLITE) |
metrics_callback |
Callable[[TierMetrics], None] | None
|
Optional callback for metrics reporting |
load_state_on_startup |
bool
|
Load all active states from WARM to HOT on startup |
Example
PostgreSQL backend (default, production)¶
config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), )
SQLite backend (local development)¶
config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), )
StateTier¶
almanak.framework.state.StateTier
¶
Bases: IntEnum
Storage tier for state data.
Ordered by access speed (fastest first).
StateData¶
almanak.framework.state.StateData
dataclass
¶
StateData(
strategy_id: str,
version: int,
state: dict[str, Any],
schema_version: int = 1,
checksum: str = "",
created_at: datetime = (lambda: datetime.now(UTC))(),
loaded_from: StateTier | None = None,
)
Strategy state data container.
Attributes:
| Name | Type | Description |
|---|---|---|
strategy_id |
str
|
Unique identifier for the strategy |
version |
int
|
CAS version number (incremented on each update) |
state |
dict[str, Any]
|
The actual state data as a dictionary |
schema_version |
int
|
Schema version for migrations |
checksum |
str
|
SHA-256 hash of state data for integrity verification |
created_at |
datetime
|
When this state version was created |
loaded_from |
StateTier | None
|
Which tier the state was loaded from |
from_dict
classmethod
¶
Create StateData from dictionary.
Migrations¶
StateMigration¶
almanak.framework.state.StateMigration
dataclass
¶
StateMigration(
version: int,
migration_fn: MigrationFunction,
description: str = "",
rollback_safe_until_version: int = 1,
created_at: datetime = (lambda: datetime.now(UTC))(),
)
Defines a single state migration.
Attributes:
| Name | Type | Description |
|---|---|---|
version |
int
|
The schema version this migration upgrades TO |
migration_fn |
MigrationFunction
|
Function that transforms state from version-1 to version |
description |
str
|
Human-readable description of what this migration does |
rollback_safe_until_version |
int
|
Minimum version that can safely rollback to this version (i.e., versions >= this can rollback without data loss) |
created_at |
datetime
|
When this migration was defined |
apply
¶
Apply this migration to state.
Creates a deep copy to avoid mutating original state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
dict[str, Any]
|
The state dict to migrate |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Migrated state dict (new copy) |
MigrationRegistry¶
almanak.framework.state.MigrationRegistry
¶
Registry of all state migrations.
Tracks migrations and provides version validation and lookup.
register
¶
Register a migration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
migration
|
StateMigration
|
The migration to register |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a migration for this version already exists |
get
¶
Get migration for a specific version.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version
|
int
|
Target version |
required |
Returns:
| Type | Description |
|---|---|
StateMigration | None
|
StateMigration or None if not found |
get_migrations_path
¶
Get list of migrations needed to go from one version to another.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
from_version
|
int
|
Starting version |
required |
to_version
|
int
|
Target version |
required |
Returns:
| Type | Description |
|---|---|
list[StateMigration]
|
List of migrations to apply (in order) |
Raises:
| Type | Description |
|---|---|
MigrationNotFoundError
|
If any required migration is missing |
get_rollback_info
¶
Get rollback safety information for a version.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_version
|
int
|
Current schema version |
required |
Returns:
| Type | Description |
|---|---|
RollbackInfo
|
RollbackInfo with safe and unsafe rollback targets |
MigrationResult¶
almanak.framework.state.MigrationResult
dataclass
¶
MigrationResult(
success: bool,
from_version: int,
to_version: int,
migrations_applied: list[int],
state: dict[str, Any],
error: str | None = None,
duration_ms: float = 0.0,
)
Result of applying migrations.
Attributes:
| Name | Type | Description |
|---|---|---|
success |
bool
|
Whether all migrations succeeded |
from_version |
int
|
Starting schema version |
to_version |
int
|
Ending schema version |
migrations_applied |
list[int]
|
List of migration versions applied |
state |
dict[str, Any]
|
The migrated state (or original if failed) |
error |
str | None
|
Error message if migration failed |
duration_ms |
float
|
Total migration time in milliseconds |
Position Management¶
PositionManager¶
almanak.framework.state.PositionManager
¶
Manages positions across multiple chains with chain dimension support.
Provides methods to store, query, and aggregate positions across chains. Designed to work with StateData.state dictionary for persistence.
Usage
Create manager for a multi-chain strategy¶
manager = PositionManager(chains=['arbitrum', 'optimism', 'base'])
Add positions¶
manager.add_position(PositionRecord( position_id='pos-1', chain='arbitrum', position_type=PositionType.SUPPLY, protocol='aave_v3', token='WETH', amount=Decimal('1.5'), value_usd=Decimal('3000'), ))
Query positions¶
all_positions = manager.positions # All chains arb_positions = manager.positions_on('arbitrum') # Single chain total_usd = manager.total_value_usd # Aggregate value
Attributes:
| Name | Type | Description |
|---|---|---|
chains |
list[str]
|
List of configured chain names |
Initialize position manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chains
|
list[str]
|
List of chain names this manager handles |
required |
initial_positions
|
list[PositionRecord] | None
|
Optional list of positions to pre-populate |
None
|
positions
property
¶
Get all positions across all chains.
Returns:
| Type | Description |
|---|---|
list[PositionRecord]
|
List of all positions from all configured chains. |
total_value_usd
property
¶
Calculate total USD value across all chains.
Returns:
| Type | Description |
|---|---|
Decimal
|
Sum of value_usd for all positions across all chains. |
positions_on
¶
Get positions on a specific chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str
|
Chain name to filter by |
required |
Returns:
| Type | Description |
|---|---|
list[PositionRecord]
|
List of positions on the specified chain |
Raises:
| Type | Description |
|---|---|
ChainNotFoundError
|
If chain is not configured |
total_value_on
¶
Calculate total USD value on a specific chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str
|
Chain name to calculate value for |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Sum of value_usd for positions on the specified chain |
Raises:
| Type | Description |
|---|---|
ChainNotFoundError
|
If chain is not configured |
add_position
¶
Add or update a position.
If a position with the same position_id exists on the same chain, it will be replaced.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
PositionRecord
|
Position to add/update |
required |
Raises:
| Type | Description |
|---|---|
ChainNotFoundError
|
If position's chain is not configured |
remove_position
¶
Remove a position by ID and chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
Position identifier |
required |
chain
|
str
|
Chain the position is on |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if position was removed, False if not found |
Raises:
| Type | Description |
|---|---|
ChainNotFoundError
|
If chain is not configured |
get_position
¶
Get a specific position by ID and chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
Position identifier |
required |
chain
|
str
|
Chain the position is on |
required |
Returns:
| Type | Description |
|---|---|
PositionRecord | None
|
PositionRecord if found, None otherwise |
Raises:
| Type | Description |
|---|---|
ChainNotFoundError
|
If chain is not configured |
find_position
¶
Find a position by ID across all chains.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
Position identifier to search for |
required |
Returns:
| Type | Description |
|---|---|
PositionRecord | None
|
PositionRecord if found, None otherwise |
positions_by_type
¶
Get all positions of a specific type across all chains.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_type
|
PositionType
|
Type of position to filter by |
required |
Returns:
| Type | Description |
|---|---|
list[PositionRecord]
|
List of positions matching the type |
positions_by_protocol
¶
Get all positions for a specific protocol across all chains.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol
|
str
|
Protocol name to filter by |
required |
Returns:
| Type | Description |
|---|---|
list[PositionRecord]
|
List of positions on the protocol |
clear
¶
Clear positions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str | None
|
If provided, clear only positions on this chain. If None, clear all positions on all chains. |
None
|
Raises:
| Type | Description |
|---|---|
ChainNotFoundError
|
If specified chain is not configured |
to_dict
¶
Serialize all positions to dictionary for state storage.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with chain -> position_id -> position data structure |
from_dict
classmethod
¶
Deserialize position manager from dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary from to_dict() |
required |
Returns:
| Type | Description |
|---|---|
PositionManager
|
PositionManager with restored positions |
get_summary
¶
Get a summary of positions across all chains.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with per-chain and total statistics |
PositionRecord¶
almanak.framework.state.PositionRecord
dataclass
¶
PositionRecord(
position_id: str,
chain: str,
position_type: PositionType,
protocol: str | None,
token: str,
amount: Decimal,
value_usd: Decimal,
created_at: datetime = (lambda: datetime.now(UTC))(),
updated_at: datetime = (lambda: datetime.now(UTC))(),
metadata: dict[str, Any] = dict(),
)
A single position record with chain dimension.
This is the fundamental unit of position tracking. Every position must have an explicit chain field to support multi-chain strategies.
Attributes:
| Name | Type | Description |
|---|---|---|
position_id |
str
|
Unique identifier for this position |
chain |
str
|
The blockchain this position is on (e.g., 'arbitrum', 'optimism') |
position_type |
PositionType
|
Type of position (TOKEN, LP, BORROW, SUPPLY, PERP) |
protocol |
str | None
|
Protocol where position is held (e.g., 'aave_v3', 'uniswap_v3') |
token |
str
|
Primary token symbol (or pool identifier for LP) |
amount |
Decimal
|
Amount of tokens or liquidity |
value_usd |
Decimal
|
Current USD value of the position |
created_at |
datetime
|
When the position was opened |
updated_at |
datetime
|
Last update timestamp |
metadata |
dict[str, Any]
|
Additional position-specific data (health factor, ranges, etc.) |
Exceptions¶
almanak.framework.state.StateConflictError
¶
StateConflictError(
strategy_id: str,
expected_version: int,
actual_version: int,
message: str | None = None,
)
Bases: Exception
Raised when CAS update fails due to version mismatch.
This error indicates that another process has modified the state since it was last read. The caller should reload the state and retry.
almanak.framework.state.StateNotFoundError
¶
Bases: Exception
Raised when state is not found in any tier.