Skip to content

State Management

Three-tier state persistence for strategy data.

StateManager

almanak.framework.state.StateManager

StateManager(
    config: StateManagerConfig | None = None,
    warm_backend: WarmStore | None = None,
)

Tiered state manager with HOT and WARM storage tiers.

Provides: - <1ms access from HOT (in-memory) cache - <10ms access from WARM (PostgreSQL or SQLite) storage - CAS semantics for safe concurrent updates - Automatic tier fallback on load - Metrics tracking for each tier - Write-through from HOT to WARM tier

The WARM tier backend can be either PostgreSQL (production) or SQLite (development/lightweight). Backend selection is via configuration:

Usage

PostgreSQL backend (default, production)

config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), ) manager = StateManager(config) await manager.initialize()

SQLite backend (development)

config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), ) manager = StateManager(config) await manager.initialize()

Save state (writes to HOT then WARM)

state = StateData(strategy_id="strat-1", version=1, state={"key": "value"}) await manager.save_state(state)

Load state (reads from fastest available tier)

loaded = await manager.load_state("strat-1")

CAS update

loaded.state["key"] = "new_value" await manager.save_state(loaded, expected_version=loaded.version)

Dependency injection: provide custom backend

custom_sqlite = SQLiteStore(SQLiteConfig(db_path="./custom.db")) manager = StateManager(config, warm_backend=custom_sqlite)

Initialize StateManager.

Parameters:

Name Type Description Default
config StateManagerConfig | None

Configuration for the state manager. Uses defaults if not provided.

None
warm_backend WarmStore | None

Optional pre-configured WARM tier backend. If provided, this backend is used instead of creating one from config. Useful for dependency injection and testing.

None

is_initialized property

is_initialized: bool

Check if StateManager is initialized.

enabled_tiers property

enabled_tiers: list[StateTier]

Get list of enabled and initialized tiers.

warm_backend_type property

warm_backend_type: WarmBackendType | None

Get the type of WARM backend being used.

Returns:

Type Description
WarmBackendType | None

WarmBackendType.SQLITE, WarmBackendType.POSTGRESQL, or None if no WARM tier.

warm_backend property

warm_backend: WarmStore | None

Get the WARM tier backend instance.

Useful for accessing backend-specific functionality like get_version_history() on SQLiteStore.

Returns:

Type Description
WarmStore | None

The WARM backend instance, or None if not initialized.

initialize async

initialize() -> None

Initialize all enabled storage tiers.

If load_state_on_startup is enabled in config, loads all active states from WARM tier to HOT tier for fast access.

close async

close() -> None

Close all storage connections.

load_state async

load_state(strategy_id: str) -> StateData

Load state from the fastest available tier.

Tries tiers in order: HOT -> WARM. Populates HOT cache on WARM hit.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier

required

Returns:

Type Description
StateData

StateData from the fastest available tier

Raises:

Type Description
StateNotFoundError

If state not found in any tier

save_state async

save_state(
    state: StateData, expected_version: int | None = None
) -> StateData

Save state to all tiers.

Writes to WARM tier (source of truth) then updates HOT cache.

Parameters:

Name Type Description Default
state StateData

State data to save

required
expected_version int | None

Expected version for CAS update. If None and state has version > 1, uses state.version - 1. If None and state has version = 1, creates new state.

None

Returns:

Type Description
StateData

Updated StateData with new version

Raises:

Type Description
StateConflictError

If CAS update fails due to version mismatch

delete_state async

delete_state(strategy_id: str) -> bool

Delete state from all tiers.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier

required

Returns:

Type Description
bool

True if state was deleted from at least one tier

invalidate_hot_cache

invalidate_hot_cache(
    strategy_id: str | None = None,
) -> None

Invalidate HOT tier cache.

Parameters:

Name Type Description Default
strategy_id str | None

Specific strategy to invalidate, or None to clear all

None

get_metrics

get_metrics(limit: int = 100) -> list[TierMetrics]

Get recent tier metrics.

Parameters:

Name Type Description Default
limit int

Maximum number of metrics to return

100

Returns:

Type Description
list[TierMetrics]

List of TierMetrics, newest first

get_metrics_summary

get_metrics_summary() -> dict[str, Any]

Get summary of tier metrics.

Returns:

Type Description
dict[str, Any]

Dictionary with per-tier average latencies and success rates

clear_metrics

clear_metrics() -> None

Clear all stored metrics.

save_clob_order async

save_clob_order(order: ClobOrderState) -> bool

Save or update a CLOB order state.

Persists order state to the WARM tier for crash recovery and order tracking across strategy restarts.

Parameters:

Name Type Description Default
order ClobOrderState

ClobOrderState to persist.

required

Returns:

Type Description
bool

True if save succeeded, False if no WARM backend or error.

get_clob_order async

get_clob_order(order_id: str) -> ClobOrderState | None

Get a CLOB order by order_id.

Parameters:

Name Type Description Default
order_id str

Order identifier.

required

Returns:

Type Description
ClobOrderState | None

ClobOrderState if found, None otherwise.

get_open_clob_orders async

get_open_clob_orders(
    market_id: str | None = None,
) -> list[ClobOrderState]

Get all open CLOB orders, optionally filtered by market.

Open orders are those with status: pending, submitted, live, partially_filled.

Parameters:

Name Type Description Default
market_id str | None

Optional market ID to filter by.

None

Returns:

Type Description
list[ClobOrderState]

List of open ClobOrderState, newest first.

update_clob_order_status async

update_clob_order_status(
    order_id: str,
    status: ClobOrderStatus,
    fills: list[ClobFill] | None = None,
    filled_size: str | None = None,
    average_fill_price: str | None = None,
    error: str | None = None,
) -> bool

Update the status and fill information of a CLOB order.

Parameters:

Name Type Description Default
order_id str

Order identifier.

required
status ClobOrderStatus

New order status.

required
fills list[ClobFill] | None

Updated list of fills (replaces existing).

None
filled_size str | None

Updated filled size.

None
average_fill_price str | None

Updated average fill price.

None
error str | None

Error message if order failed.

None

Returns:

Type Description
bool

True if order was found and updated.

save_portfolio_snapshot async

save_portfolio_snapshot(snapshot: PortfolioSnapshot) -> int

Save a portfolio snapshot.

Persists portfolio value and position data for dashboard display and PnL tracking.

Parameters:

Name Type Description Default
snapshot PortfolioSnapshot

PortfolioSnapshot to persist.

required

Returns:

Type Description
int

Snapshot ID if save succeeded, 0 if no WARM backend or error.

get_latest_snapshot async

get_latest_snapshot(
    strategy_id: str,
) -> PortfolioSnapshot | None

Get most recent portfolio snapshot for a strategy.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier.

required

Returns:

Type Description
PortfolioSnapshot | None

Latest PortfolioSnapshot if found, None otherwise.

get_snapshots_since async

get_snapshots_since(
    strategy_id: str, since: datetime, limit: int = 168
) -> list[PortfolioSnapshot]

Get portfolio snapshots since a timestamp (for charts).

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier.

required
since datetime

Start timestamp for query.

required
limit int

Maximum number of snapshots to return.

168

Returns:

Type Description
list[PortfolioSnapshot]

List of PortfolioSnapshot, oldest first.

get_snapshot_at async

get_snapshot_at(
    strategy_id: str, timestamp: datetime
) -> PortfolioSnapshot | None

Get snapshot closest to a timestamp (for PnL calculation).

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier.

required
timestamp datetime

Target timestamp.

required

Returns:

Type Description
PortfolioSnapshot | None

PortfolioSnapshot closest to timestamp, or None if not found.

save_portfolio_metrics async

save_portfolio_metrics(metrics: PortfolioMetrics) -> bool

Save or update portfolio metrics.

Portfolio metrics store baseline values (initial_value_usd) that survive strategy restarts, enabling accurate PnL calculation.

Parameters:

Name Type Description Default
metrics PortfolioMetrics

PortfolioMetrics to persist.

required

Returns:

Type Description
bool

True if save succeeded, False if no WARM backend or error.

get_portfolio_metrics async

get_portfolio_metrics(
    strategy_id: str,
) -> PortfolioMetrics | None

Get portfolio metrics for a strategy.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier.

required

Returns:

Type Description
PortfolioMetrics | None

PortfolioMetrics if found, None otherwise.

cleanup_old_snapshots async

cleanup_old_snapshots(retention_days: int = 7) -> int

Clean up old portfolio snapshots.

Parameters:

Name Type Description Default
retention_days int

Number of days of snapshots to retain.

7

Returns:

Type Description
int

Number of snapshots deleted.

StateManagerConfig

almanak.framework.state.StateManagerConfig dataclass

StateManagerConfig(
    enable_hot: bool = True,
    enable_warm: bool = True,
    warm_backend: WarmBackendType = WarmBackendType.POSTGRESQL,
    hot_cache_ttl_seconds: int = 0,
    hot_cache_max_size: int = 1000,
    postgres_config: PostgresConfig = PostgresConfig(),
    sqlite_config: SQLiteConfigLight = SQLiteConfigLight(),
    metrics_callback: Callable[[TierMetrics], None]
    | None = None,
    load_state_on_startup: bool = True,
)

Configuration for StateManager.

Attributes:

Name Type Description
enable_hot bool

Enable in-memory cache tier

enable_warm bool

Enable WARM tier (PostgreSQL or SQLite)

warm_backend WarmBackendType

Which backend to use for WARM tier (POSTGRESQL or SQLITE)

hot_cache_ttl_seconds int

TTL for hot cache entries (0 = no expiry)

hot_cache_max_size int

Maximum entries in hot cache

postgres_config PostgresConfig

PostgreSQL configuration (used when warm_backend=POSTGRESQL)

sqlite_config SQLiteConfigLight

SQLite configuration (used when warm_backend=SQLITE)

metrics_callback Callable[[TierMetrics], None] | None

Optional callback for metrics reporting

load_state_on_startup bool

Load all active states from WARM to HOT on startup

Example

PostgreSQL backend (default, production)

config = StateManagerConfig( warm_backend=WarmBackendType.POSTGRESQL, postgres_config=PostgresConfig(host="localhost"), )

SQLite backend (local development)

config = StateManagerConfig( warm_backend=WarmBackendType.SQLITE, sqlite_config=SQLiteConfigLight(db_path="./state.db"), )

StateTier

almanak.framework.state.StateTier

Bases: IntEnum

Storage tier for state data.

Ordered by access speed (fastest first).

StateData

almanak.framework.state.StateData dataclass

StateData(
    strategy_id: str,
    version: int,
    state: dict[str, Any],
    schema_version: int = 1,
    checksum: str = "",
    created_at: datetime = (lambda: datetime.now(UTC))(),
    loaded_from: StateTier | None = None,
)

Strategy state data container.

Attributes:

Name Type Description
strategy_id str

Unique identifier for the strategy

version int

CAS version number (incremented on each update)

state dict[str, Any]

The actual state data as a dictionary

schema_version int

Schema version for migrations

checksum str

SHA-256 hash of state data for integrity verification

created_at datetime

When this state version was created

loaded_from StateTier | None

Which tier the state was loaded from

__post_init__

__post_init__() -> None

Calculate checksum if not provided.

verify_checksum

verify_checksum() -> bool

Verify the integrity of state data.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

from_dict classmethod

from_dict(data: dict[str, Any]) -> StateData

Create StateData from dictionary.

Migrations

StateMigration

almanak.framework.state.StateMigration dataclass

StateMigration(
    version: int,
    migration_fn: MigrationFunction,
    description: str = "",
    rollback_safe_until_version: int = 1,
    created_at: datetime = (lambda: datetime.now(UTC))(),
)

Defines a single state migration.

Attributes:

Name Type Description
version int

The schema version this migration upgrades TO

migration_fn MigrationFunction

Function that transforms state from version-1 to version

description str

Human-readable description of what this migration does

rollback_safe_until_version int

Minimum version that can safely rollback to this version (i.e., versions >= this can rollback without data loss)

created_at datetime

When this migration was defined

__post_init__

__post_init__() -> None

Validate migration.

apply

apply(state: dict[str, Any]) -> dict[str, Any]

Apply this migration to state.

Creates a deep copy to avoid mutating original state.

Parameters:

Name Type Description Default
state dict[str, Any]

The state dict to migrate

required

Returns:

Type Description
dict[str, Any]

Migrated state dict (new copy)

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

MigrationRegistry

almanak.framework.state.MigrationRegistry

MigrationRegistry()

Registry of all state migrations.

Tracks migrations and provides version validation and lookup.

current_version property

current_version: int

Get the current (latest) schema version.

migrations property

migrations: dict[int, StateMigration]

Get all registered migrations.

register

register(migration: StateMigration) -> None

Register a migration.

Parameters:

Name Type Description Default
migration StateMigration

The migration to register

required

Raises:

Type Description
ValueError

If a migration for this version already exists

get

get(version: int) -> StateMigration | None

Get migration for a specific version.

Parameters:

Name Type Description Default
version int

Target version

required

Returns:

Type Description
StateMigration | None

StateMigration or None if not found

get_migrations_path

get_migrations_path(
    from_version: int, to_version: int
) -> list[StateMigration]

Get list of migrations needed to go from one version to another.

Parameters:

Name Type Description Default
from_version int

Starting version

required
to_version int

Target version

required

Returns:

Type Description
list[StateMigration]

List of migrations to apply (in order)

Raises:

Type Description
MigrationNotFoundError

If any required migration is missing

get_rollback_info

get_rollback_info(current_version: int) -> RollbackInfo

Get rollback safety information for a version.

Parameters:

Name Type Description Default
current_version int

Current schema version

required

Returns:

Type Description
RollbackInfo

RollbackInfo with safe and unsafe rollback targets

clear

clear() -> None

Clear all registered migrations (mainly for testing).

MigrationResult

almanak.framework.state.MigrationResult dataclass

MigrationResult(
    success: bool,
    from_version: int,
    to_version: int,
    migrations_applied: list[int],
    state: dict[str, Any],
    error: str | None = None,
    duration_ms: float = 0.0,
)

Result of applying migrations.

Attributes:

Name Type Description
success bool

Whether all migrations succeeded

from_version int

Starting schema version

to_version int

Ending schema version

migrations_applied list[int]

List of migration versions applied

state dict[str, Any]

The migrated state (or original if failed)

error str | None

Error message if migration failed

duration_ms float

Total migration time in milliseconds

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Position Management

PositionManager

almanak.framework.state.PositionManager

PositionManager(
    chains: list[str],
    initial_positions: list[PositionRecord] | None = None,
)

Manages positions across multiple chains with chain dimension support.

Provides methods to store, query, and aggregate positions across chains. Designed to work with StateData.state dictionary for persistence.

Usage

Create manager for a multi-chain strategy

manager = PositionManager(chains=['arbitrum', 'optimism', 'base'])

Add positions

manager.add_position(PositionRecord( position_id='pos-1', chain='arbitrum', position_type=PositionType.SUPPLY, protocol='aave_v3', token='WETH', amount=Decimal('1.5'), value_usd=Decimal('3000'), ))

Query positions

all_positions = manager.positions # All chains arb_positions = manager.positions_on('arbitrum') # Single chain total_usd = manager.total_value_usd # Aggregate value

Attributes:

Name Type Description
chains list[str]

List of configured chain names

Initialize position manager.

Parameters:

Name Type Description Default
chains list[str]

List of chain names this manager handles

required
initial_positions list[PositionRecord] | None

Optional list of positions to pre-populate

None

chains property

chains: list[str]

Get list of configured chains.

positions property

positions: list[PositionRecord]

Get all positions across all chains.

Returns:

Type Description
list[PositionRecord]

List of all positions from all configured chains.

total_value_usd property

total_value_usd: Decimal

Calculate total USD value across all chains.

Returns:

Type Description
Decimal

Sum of value_usd for all positions across all chains.

positions_on

positions_on(chain: str) -> list[PositionRecord]

Get positions on a specific chain.

Parameters:

Name Type Description Default
chain str

Chain name to filter by

required

Returns:

Type Description
list[PositionRecord]

List of positions on the specified chain

Raises:

Type Description
ChainNotFoundError

If chain is not configured

total_value_on

total_value_on(chain: str) -> Decimal

Calculate total USD value on a specific chain.

Parameters:

Name Type Description Default
chain str

Chain name to calculate value for

required

Returns:

Type Description
Decimal

Sum of value_usd for positions on the specified chain

Raises:

Type Description
ChainNotFoundError

If chain is not configured

add_position

add_position(position: PositionRecord) -> None

Add or update a position.

If a position with the same position_id exists on the same chain, it will be replaced.

Parameters:

Name Type Description Default
position PositionRecord

Position to add/update

required

Raises:

Type Description
ChainNotFoundError

If position's chain is not configured

remove_position

remove_position(position_id: str, chain: str) -> bool

Remove a position by ID and chain.

Parameters:

Name Type Description Default
position_id str

Position identifier

required
chain str

Chain the position is on

required

Returns:

Type Description
bool

True if position was removed, False if not found

Raises:

Type Description
ChainNotFoundError

If chain is not configured

get_position

get_position(
    position_id: str, chain: str
) -> PositionRecord | None

Get a specific position by ID and chain.

Parameters:

Name Type Description Default
position_id str

Position identifier

required
chain str

Chain the position is on

required

Returns:

Type Description
PositionRecord | None

PositionRecord if found, None otherwise

Raises:

Type Description
ChainNotFoundError

If chain is not configured

find_position

find_position(position_id: str) -> PositionRecord | None

Find a position by ID across all chains.

Parameters:

Name Type Description Default
position_id str

Position identifier to search for

required

Returns:

Type Description
PositionRecord | None

PositionRecord if found, None otherwise

positions_by_type

positions_by_type(
    position_type: PositionType,
) -> list[PositionRecord]

Get all positions of a specific type across all chains.

Parameters:

Name Type Description Default
position_type PositionType

Type of position to filter by

required

Returns:

Type Description
list[PositionRecord]

List of positions matching the type

positions_by_protocol

positions_by_protocol(
    protocol: str,
) -> list[PositionRecord]

Get all positions for a specific protocol across all chains.

Parameters:

Name Type Description Default
protocol str

Protocol name to filter by

required

Returns:

Type Description
list[PositionRecord]

List of positions on the protocol

clear

clear(chain: str | None = None) -> None

Clear positions.

Parameters:

Name Type Description Default
chain str | None

If provided, clear only positions on this chain. If None, clear all positions on all chains.

None

Raises:

Type Description
ChainNotFoundError

If specified chain is not configured

to_dict

to_dict() -> dict[str, Any]

Serialize all positions to dictionary for state storage.

Returns:

Type Description
dict[str, Any]

Dictionary with chain -> position_id -> position data structure

from_dict classmethod

from_dict(data: dict[str, Any]) -> PositionManager

Deserialize position manager from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary from to_dict()

required

Returns:

Type Description
PositionManager

PositionManager with restored positions

get_summary

get_summary() -> dict[str, Any]

Get a summary of positions across all chains.

Returns:

Type Description
dict[str, Any]

Dictionary with per-chain and total statistics

__repr__

__repr__() -> str

String representation of manager.

PositionRecord

almanak.framework.state.PositionRecord dataclass

PositionRecord(
    position_id: str,
    chain: str,
    position_type: PositionType,
    protocol: str | None,
    token: str,
    amount: Decimal,
    value_usd: Decimal,
    created_at: datetime = (lambda: datetime.now(UTC))(),
    updated_at: datetime = (lambda: datetime.now(UTC))(),
    metadata: dict[str, Any] = dict(),
)

A single position record with chain dimension.

This is the fundamental unit of position tracking. Every position must have an explicit chain field to support multi-chain strategies.

Attributes:

Name Type Description
position_id str

Unique identifier for this position

chain str

The blockchain this position is on (e.g., 'arbitrum', 'optimism')

position_type PositionType

Type of position (TOKEN, LP, BORROW, SUPPLY, PERP)

protocol str | None

Protocol where position is held (e.g., 'aave_v3', 'uniswap_v3')

token str

Primary token symbol (or pool identifier for LP)

amount Decimal

Amount of tokens or liquidity

value_usd Decimal

Current USD value of the position

created_at datetime

When the position was opened

updated_at datetime

Last update timestamp

metadata dict[str, Any]

Additional position-specific data (health factor, ranges, etc.)

to_dict

to_dict() -> dict[str, Any]

Serialize position to dictionary.

from_dict classmethod

from_dict(data: dict[str, Any]) -> PositionRecord

Deserialize position from dictionary.

Exceptions

almanak.framework.state.StateConflictError

StateConflictError(
    strategy_id: str,
    expected_version: int,
    actual_version: int,
    message: str | None = None,
)

Bases: Exception

Raised when CAS update fails due to version mismatch.

This error indicates that another process has modified the state since it was last read. The caller should reload the state and retry.

almanak.framework.state.StateNotFoundError

StateNotFoundError(
    strategy_id: str, message: str | None = None
)

Bases: Exception

Raised when state is not found in any tier.