Saltar a contenido

Strategies

Strategy base classes and the market snapshot interface.

Implementing Teardown

Every strategy must implement three teardown methods so operators can safely close positions. Without them, close-requests are silently ignored.

Required Methods

Method Purpose
supports_teardown() -> bool Return True to enable teardown
get_open_positions() Return a TeardownPositionSummary listing all open positions
generate_teardown_intents(mode, market) Return an ordered list of Intent objects that unwind positions

Execution Order

If your strategy holds multiple position types, teardown intents must follow this order:

  1. PERP -- close perpetual positions first (highest risk)
  2. BORROW -- repay borrows to free collateral
  3. SUPPLY -- withdraw supplied collateral
  4. LP -- close liquidity positions
  5. TOKEN -- swap remaining tokens to stable

Example: Swap Strategy Teardown

from decimal import Decimal
from almanak import IntentStrategy, Intent

class MyStrategy(IntentStrategy):
    def supports_teardown(self) -> bool:
        return True

    def get_open_positions(self) -> "TeardownPositionSummary":
        from datetime import UTC, datetime
        from almanak.framework.teardown import (
            PositionInfo, PositionType, TeardownPositionSummary,
        )
        return TeardownPositionSummary(
            deployment_id=getattr(self, "deployment_id", "my_strategy"),
            timestamp=datetime.now(UTC),
            positions=[
                PositionInfo(
                    position_type=PositionType.TOKEN,
                    position_id="my_strategy_eth",
                    chain=self.chain,
                    protocol="uniswap_v3",
                    value_usd=Decimal("1000"),  # query on-chain balance, not cache
                    details={"asset": "WETH"},
                )
            ],
        )

    def generate_teardown_intents(self, mode: "TeardownMode", market=None) -> list[Intent]:
        from almanak.framework.teardown import TeardownMode
        max_slippage = Decimal("0.03") if mode == TeardownMode.HARD else Decimal("0.005")
        return [
            Intent.swap(
                from_token="WETH", to_token="USDC",
                amount="all", max_slippage=max_slippage, protocol="uniswap_v3",
            )
        ]

Always query on-chain state

get_open_positions() must query live on-chain balances, not cached values. Stale data can cause teardown to skip positions or attempt to close positions that no longer exist.

Testing with force_action

Real strategies gate their decide() output behind signals — RSI thresholds, MACD crosses, balance checks, cooldowns. That makes them hard to test on a forked block where those signals may not fire. The convention is a force_action config field that bypasses signal gates and lets you drive a known intent through the production code path.

@dataclass
class MyStrategyConfig:
    # Production tuning
    rsi_oversold: int = 30
    rsi_overbought: int = 70
    trade_size_usd: Decimal = Decimal("100")
    # Testing affordance — empty in production
    force_action: str = ""

class MyStrategy(IntentStrategy):
    def __init__(self, config, ...):
        super().__init__(...)
        self.force_action = str(self.get_config("force_action", "") or "").lower()
        # ... other config fields ...

    def decide(self, market: MarketSnapshot) -> Intent:
        # Test affordance: short-circuit BEFORE any signal logic.
        if self.force_action:
            return self._forced_intent(market)

        # Production decide() begins here.
        rsi = market.indicators.rsi(self.base_token, ...)
        if rsi.value < self.rsi_oversold:
            return Intent.swap(...)
        return Intent.hold(reason="No signal")

    def _forced_intent(self, market: MarketSnapshot) -> Intent:
        """Map free-form force_action strings to concrete Intents.

        Values are strategy-specific. Pick names that match what each branch
        DOES (verbs: "buy", "sell", "open", "close", "supply"), not internal
        state machine values.
        """
        if self.force_action == "buy":
            return Intent.swap(
                from_token=self.quote_token,
                to_token=self.base_token,
                amount_usd=self.trade_size_usd,
                protocol=self.protocol,
                chain=self.chain,
            )
        if self.force_action == "sell":
            return Intent.swap(
                from_token=self.base_token,
                to_token=self.quote_token,
                amount="all",
                protocol=self.protocol,
                chain=self.chain,
            )
        raise ValueError(f"Unknown force_action: {self.force_action!r}")

Driving force_action from the CLI

The almanak strat test command mutates force_action between iterations and runs each through the production code path on a managed Anvil fork:

# Single forced action
almanak strat test --actions buy --teardown --json

# Multiple actions in sequence (one iteration each)
almanak strat test --actions open,collect_fees --teardown --json

# Teardown only (no force_actions)
almanak strat test --teardown --json

The CLI emits structured JSON: every step's status, failure_logs, and a summary.all_passed bool. See CLI Reference: strat test for full flag docs.

Authoring rules

Rule Why
Values are free-form strings, strategy-specific. Pick names that describe the action verb ("buy", "open", "supply", "close"), not state-machine internals ("phase_2", "step_3a"). Anyone reading your code (or grepping _forced_intent to find supported values) should be able to map name → intent shape without context.
One value per non-HOLD intent type. If your strategy emits both Intent.lp_open and Intent.swap, expose both as force_action values (e.g. "open" and "buy"). Each path needs to be drivable independently. Missing branches mean missing test coverage.
Each branch must produce a non-HOLD Intent. If a forced branch returns Intent.hold(...), the test passes silently without exercising the path you wanted. The whole point of force_action is to bypass gates — returning HOLD defeats it.
Order matters; document prerequisites. force_action="close" requires an open position; force_action="collect_fees" requires an LP NFT. The CLI runs values in --actions order against one Anvil instance, so prereqs must be driven first.
Skip values that overlap with generate_teardown_intents(). If teardown emits an unwind swap, don't drive the same intent via --actions — the position will be closed before teardown runs. --teardown is the canonical unwind path; double-driving it leaves teardown nothing to do.
Empty default in config (force_action: str = ""). Production deploys must run signal-gated logic; force_action is testing-only. A non-empty default would bypass signals in production and execute on every iteration.

Example: LP strategy with three force_action values

def _forced_intent(self, market: MarketSnapshot) -> Intent:
    if self.force_action == "open":
        # Opens an LP position.
        return Intent.lp_open(
            pool=self.pool,
            amount0=self.amount0,
            amount1=self.amount1,
            range_lower=self._compute_lower(market),
            range_upper=self._compute_upper(market),
            protocol=self.protocol,
            chain=self.chain,
        )
    if self.force_action == "collect_fees":
        # Collects fees on an existing position. Drive AFTER "open".
        # NFT-based protocols (Uniswap V3 family, Uniswap V4, Aerodrome
        # Slipstream) require the position's tokenId via protocol_params.
        return Intent.collect_fees(
            pool=self.pool,
            protocol=self.protocol,
            chain=self.chain,
            protocol_params={
                "position_id": self.force_position_id or self._position_id,
            },
        )
    if self.force_action == "close":
        # Closes the position. SKIP this in --actions; teardown emits the same intent.
        return Intent.lp_close(
            pool=self.pool,
            position_id=self.force_position_id or self._position_id,
            protocol=self.protocol,
            chain=self.chain,
        )
    raise ValueError(f"Unknown force_action: {self.force_action!r}")

CLI to test the lifecycle without double-closing:

almanak strat test --actions open,collect_fees --teardown --json
# `close` is skipped from --actions because teardown emits Intent.lp_close

Companion override fields

Strategies that track on-chain identifiers (NFT IDs, vault shares) often need a paired override so a forced action can target an existing position rather than the strategy's cached state:

@dataclass
class MyLPConfig:
    force_action: str = ""
    force_position_id: str | None = None  # override the cached self._position_id

# In _forced_intent:
position_id = self.force_position_id or self._position_id

Only add fields like this when a forced branch genuinely needs them. A swap-only strategy doesn't track position IDs, so it shouldn't carry force_position_id.

Common mistakes

  • Returning Intent.hold() from a forced branch — the test passes vacuously, you haven't tested anything.
  • Using state-machine values as force_action names ("phase_b", "step_3") — opaque to anyone trying to figure out what each value does.
  • Forgetting to short-circuit at the top of decide() — if signal gates run before the force_action check, the gate's HOLD return value wins and the forced intent never fires.
  • Reusing values that teardown emits — running --actions close --teardown unwinds the position twice (or once and then has nothing to test on teardown).
  • Setting force_action in the production config — bypasses every safety gate in deployment. Always default to "".

State Persistence

Strategies that track internal state across iterations (position IDs, phase tracking, trade counters) must implement two hooks so that state survives restarts.

The framework persists runner-level metadata automatically (iteration counts, consecutive errors, execution progress for multi-step intents). But strategy-specific state is opt-in -- without these hooks, instance variables are lost when the process stops.

Required Hooks

Method Called Purpose
get_persistent_state() -> dict After each iteration Return a dict of state to save
load_persistent_state(state: dict) On startup / resume Restore state from the saved dict

Example: LP Strategy with Position Tracking

from decimal import Decimal
from typing import Any
from almanak import IntentStrategy, Intent, MarketSnapshot

class MyLPStrategy(IntentStrategy):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._position_id: int | None = None
        self._has_position: bool = False
        self._total_fees_collected: Decimal = Decimal("0")

    def decide(self, market: MarketSnapshot) -> Intent | None:
        if self._has_position:
            # Uniswap V3 / V4 / Aerodrome Slipstream identify the position by
            # NFT tokenId — pass it through protocol_params, not as a
            # top-level argument.
            return Intent.collect_fees(
                pool="WETH/USDC/3000",
                protocol="uniswap_v3",
                protocol_params={"position_id": self._position_id},
            )
        return Intent.lp_open(
            pool="WETH/USDC/3000",
            amount0=Decimal("0.5"),
            amount1=Decimal("1000"),
            range_lower=Decimal("1800"),
            range_upper=Decimal("2200"),
            protocol="uniswap_v3",
        )

    def on_intent_executed(self, intent, success: bool, result):
        """Update state after execution -- persisted via get_persistent_state()."""
        if success and result.position_id:
            self._position_id = result.position_id
            self._has_position = True

    # -- State persistence hooks --

    def get_persistent_state(self) -> dict[str, Any]:
        return {
            "position_id": self._position_id,
            "has_position": self._has_position,
            "total_fees_collected": str(self._total_fees_collected),
        }

    def load_persistent_state(self, state: dict[str, Any]) -> None:
        self._position_id = state.get("position_id")
        self._has_position = state.get("has_position", False)
        self._total_fees_collected = Decimal(state.get("total_fees_collected", "0"))

What the Framework Persists Automatically

These are saved by the runner without any strategy-author code:

State Persisted to Survives restart
Iteration count, success count, error count strategy_state table Yes
Multi-step execution progress (which step completed, serialized intents) strategy_state.execution_progress Yes -- runner resumes from the failed step
Operator pause flag strategy_state.is_paused Yes
Portfolio snapshots (total value, positions) portfolio_snapshots table Yes
Portfolio PnL baseline (initial value, deposits, gas) portfolio_metrics table Yes
Timeline events (execution audit trail) timeline_events table Yes

Guidelines

  • Use defensive .get() with defaults in load_persistent_state() so older saved state doesn't crash when you add new fields.
  • Store Decimal as strings (str(amount)) and parse back (Decimal(state["amount"])) for safe JSON round-tripping.
  • on_intent_executed() is the natural place to update state after a trade (e.g., storing a position ID). get_persistent_state() then picks it up for the next save.
  • All values must be JSON-serializable. The state dict is stored as a JSON blob in the database.

Without these hooks, strategy state is lost on restart

If you store state in instance variables but don't implement the persistence hooks, a restart means your strategy has no memory of open positions, completed trades, or internal phase. This is especially dangerous for LP and lending strategies where losing a position ID means the strategy cannot close its own positions.

IntentStrategy

The primary base class for writing strategies. Implement the decide() method to return an Intent.

almanak.framework.strategies.IntentStrategy

IntentStrategy(
    config: ConfigT,
    chain: str,
    wallet_address: str,
    risk_guard_config: RiskGuardConfig | None = None,
    notification_callback: NotificationCallback
    | None = None,
    compiler: IntentCompiler | None = None,
    state_machine_config: StateMachineConfig | None = None,
    price_oracle: PriceOracle | None = None,
    rsi_provider: RSIProvider | None = None,
    balance_provider: BalanceProvider | None = None,
    rpc_url: str | None = None,
    wallet_activity_provider: WalletActivityProvider
    | None = None,
    chains: list[str] | None = None,
    chain_wallets: dict[str, str] | None = None,
)

Bases: StrategyBase[ConfigT]

Base class for Intent-based strategies.

IntentStrategy simplifies strategy development by allowing developers to write just a decide() method that returns an Intent. The framework handles:

  1. Market data access via MarketSnapshot
  2. Intent compilation to ActionBundle
  3. State machine generation for execution
  4. Hot-reloadable configuration
  5. Error handling and retries

Subclasses must implement the abstract decide() method.

Example

@almanak_strategy(name="simple_strategy") class SimpleStrategy(IntentStrategy): def decide(self, market: MarketSnapshot) -> Optional[Intent]: if market.rsi("ETH").is_oversold: return Intent.swap("USDC", "ETH", amount_usd=Decimal("100")) return Intent.hold()

Attributes:

Name Type Description
compiler IntentCompiler

IntentCompiler for converting intents to action bundles

state_machine_config

Configuration for state machine execution

_current_intent AnyIntent | None

Currently executing intent (if any)

_current_state_machine IntentStateMachine | None

Current state machine (if any)

Initialize the intent strategy.

Parameters:

Name Type Description Default
config ConfigT

Hot-reloadable configuration

required
chain str

Chain to operate on (e.g., "arbitrum")

required
wallet_address str

Wallet address for transactions

required
risk_guard_config RiskGuardConfig | None

Risk guard configuration

None
notification_callback NotificationCallback | None

Callback for operator notifications

None
compiler IntentCompiler | None

Intent compiler (required for direct run() calls, optional for runner)

None
state_machine_config StateMachineConfig | None

State machine configuration

None
price_oracle PriceOracle | None

Function to fetch prices

None
rsi_provider RSIProvider | None

Function to calculate RSI (token, period[, timeframe=]) -> RSIData

None
balance_provider BalanceProvider | None

Function to fetch balances

None
rpc_url str | None

RPC URL for on-chain queries (needed for LP close)

None
wallet_activity_provider WalletActivityProvider | None

Provider for leader wallet activity signals

None
chains list[str] | None

List of all chains this strategy operates on (multi-chain)

None
chain_wallets dict[str, str] | None

Per-chain wallet addresses from wallet registry

None

chain property

chain: str

Get the primary chain name.

chains property

chains: list[str]

Get all chains this strategy operates on.

wallet_address property

wallet_address: str

Get the wallet address.

compiler property writable

compiler: IntentCompiler

Get the intent compiler.

Raises:

Type Description
RuntimeError

If compiler was not provided and is accessed directly. The StrategyRunner creates its own compiler with real prices, so this is only needed for direct run() calls.

current_intent property

current_intent: AnyIntent | None

Get the currently executing intent.

current_state_machine property

current_state_machine: IntentStateMachine | None

Get the current state machine.

lp_position_tracker property

lp_position_tracker: LPPositionTracker

Read-only accessor to the framework's LP position tracker.

Exposed for tests and tooling. Strategies should not need to touch this directly — the runner manages it transparently.

quote_asset property

quote_asset: QuoteAsset

The resolved performance quote asset (decorator default or boot override).

Definition-only: exposed as metadata for the hosted platform. The SDK does not change valuation/accounting/CLI behaviour based on it, and it is frozen after boot (not part of the hot-reloadable config surface).

get_wallet_for_chain

get_wallet_for_chain(chain: str) -> str

Get the wallet address for a specific chain.

If a wallet registry provided per-chain wallets, returns the chain-specific wallet. Otherwise falls back to the default wallet.

Parameters:

Name Type Description Default
chain str

Chain name (e.g., "arbitrum", "base")

required

Returns:

Type Description
str

Wallet address for the specified chain

validate_config

validate_config() -> None

Validate the strategy's configuration.

Lifecycle hook invoked automatically from :py:meth:__init__ AFTER the config has been loaded (via super().__init__) and BEFORE any other setup that depends on config (chain wiring, providers, state machine, etc.).

Subclasses override this method to enforce preconditions on their configuration — required fields, value ranges, cross-field invariants, or any other invariant that must hold before the strategy is usable. On failure, raise :py:class:ConfigValidationError with a clear message and the offending field when applicable.

This hook exists so tooling like the Portfolio Manager's strat check preflight can catch misconfigurations at construction time rather than at the first decide() call in production.

The default implementation is a no-op, so existing strategies require no changes.

Raises:

Type Description
ConfigValidationError

If the configuration is invalid. The error's field attribute identifies the offending field when applicable; otherwise None for cross-field errors.

Example

from decimal import Decimal from almanak.framework.strategies.exceptions import ConfigValidationError

class MyStrategy(IntentStrategy): def validate_config(self) -> None: # NOTE: configs loaded from JSON / env come back as strings. # Always coerce numerics through Decimal(str(...)) so that # comparisons are numeric, not lexicographic (e.g. "9" >= "10" # is True as strings but False as numbers). size = Decimal(str(self.get_config("trade_size_usd", "0"))) if size <= 0: raise ConfigValidationError( "trade_size_usd must be > 0", field="trade_size_usd", ) oversold = Decimal(str(self.get_config("rsi_oversold", "30"))) overbought = Decimal(str(self.get_config("rsi_overbought", "70"))) if oversold >= overbought: raise ConfigValidationError( "rsi_oversold must be < rsi_overbought", field="rsi_oversold", )

set_state_manager

set_state_manager(
    state_manager: Any, deployment_id: str
) -> None

Set the state manager for persistence.

Called by the runner to inject the state manager.

Parameters:

Name Type Description Default
state_manager Any

StateManager instance

required
deployment_id str

Unique ID for this strategy instance

required

get_persistent_state

get_persistent_state() -> dict[str, Any]

Get strategy state to persist.

Override this method to define what state should be persisted. Default implementation returns empty dict (no state).

Returns:

Type Description
dict[str, Any]

Dict of state key-value pairs to persist

load_persistent_state

load_persistent_state(state: dict[str, Any]) -> None

Load persisted state into the strategy.

Override this method to restore state from persistence. Default implementation does nothing.

Parameters:

Name Type Description Default
state dict[str, Any]

Dict of state key-value pairs loaded from storage

required

reconcile_resumed_state

reconcile_resumed_state(
    market: MarketSnapshot,
) -> bool | None

Reconcile resumed in-memory side-state against live on-chain truth.

Optional post-resume guardrail hook (VIB-5155 / ALM-2719). After the runner restores persisted state via load_persistent_state it calls this hook ONCE, before the first decide(), with a live market snapshot. A strategy that caches a position-side flag (e.g. "holding base token") can use this to re-derive that flag from market.balance(...) so a stale / desynced flag cannot HOLD-lock a valid risk-off exit.

The default implementation is a no-op — strategies with no cached side-state need not override it.

Contract
  • This is a guardrail, not a control-flow gate. The runner runs it warn-only and inside a try/except: the return value never gates whether the strategy runs, and exceptions are swallowed. State mutations the hook makes (e.g. correcting a cached flag) are strategy-owned and DO affect the subsequent decide() / teardown — that is the intended reconciliation behavior.
  • Live balance is truth; the persisted flag is only a hint.

Parameters:

Name Type Description Default
market MarketSnapshot

A fresh market snapshot for the current cycle.

required

Returns:

Type Description
bool | None

True if a desync was detected and the cached side-state was

bool | None

corrected (the runner logs a WARNING + emits a STATE_CHANGE

bool | None

event); False if state already agreed with live balance;

bool | None

None (default) when the strategy does not track reconcilable

bool | None

side-state.

save_state

save_state() -> None

Save current strategy state to persistence.

Called by runner after each iteration.

flush_pending_saves async

flush_pending_saves() -> None

Wait for any pending save operations to complete.

This should be called before disconnecting from the gateway to ensure all state saves have completed. Handles both successful completion and errors gracefully.

load_state

load_state() -> bool

Load strategy state from persistence.

Called by runner on startup.

Returns:

Type Description
bool

True if state was found and loaded, False otherwise

load_state_async async

load_state_async() -> bool

Async variant of load_state() -- preferred when already in an event loop.

Called by the CLI runner inside its async setup so that state is always restored correctly, regardless of whether a loop is already running.

decide abstractmethod

decide(market: MarketSnapshot) -> DecideResult

Decide what action to take based on current market conditions.

This is the main method that strategy developers need to implement. It receives a MarketSnapshot with current market data and should return an Intent, IntentSequence, list of intents, or None.

Parameters:

Name Type Description Default
market MarketSnapshot

Current market snapshot with prices, balances, RSI, etc.

required

Returns:

Type Description
DecideResult

One of:

DecideResult
  • Single Intent: Execute one action
DecideResult
  • IntentSequence: Execute multiple actions sequentially (dependent)
DecideResult
  • list[Intent | IntentSequence]: Execute items in parallel
DecideResult
  • None: Take no action (equivalent to Intent.hold())
DecideResult

Returning None is equivalent to returning Intent.hold().

Example

def decide(self, market: MarketSnapshot) -> DecideResult: # Single intent if market.rsi("ETH").is_oversold: return Intent.swap("USDC", "ETH", amount_usd=Decimal("1000"))

# Sequence of dependent actions (execute in order)
if should_move_funds:
    return Intent.sequence([
        Intent.swap("USDC", "ETH", amount=Decimal("1000"), chain="base"),
        Intent.supply(protocol="aave_v3", token="WETH", amount=Decimal("0.5"), chain="arbitrum"),
    ])

# Multiple independent actions (execute in parallel)
if should_rebalance:
    return [
        Intent.swap("USDC", "ETH", amount=Decimal("500"), chain="arbitrum"),
        Intent.swap("USDC", "ETH", amount=Decimal("500"), chain="optimism"),
    ]

# No action
return Intent.hold(reason="RSI in neutral zone")

on_intent_executed

on_intent_executed(
    intent: Any, success: bool, result: Any
) -> None

Called after each intent execution completes.

Override this method to react to execution results, e.g., to track position IDs, log swap amounts, or update state based on results.

The result object is enriched by the framework with extracted data that "just appears" based on intent type: - SWAP: result.swap_amounts (SwapAmounts) - LP_OPEN: result.position_id, result.extracted_data["liquidity"] - LP_CLOSE: result.lp_close_data (LPCloseData) - PERP_OPEN: result.extracted_data["entry_price"], ["leverage"]

Parameters:

Name Type Description Default
intent Any

The intent that was executed

required
success bool

Whether execution succeeded

required
result Any

ExecutionResult with enriched data

required

valuate

valuate(market: MarketSnapshot) -> Decimal

Calculate the total portfolio value in USD for vault settlement.

Called by the framework during vault settlement to determine the current value of the strategy's holdings. The returned value is converted to underlying token units and proposed as the new totalAssets for the vault.

The default implementation sums balance_usd for all known token balances in the market snapshot. Override this method for custom valuation logic (e.g., including LP positions, pending rewards, or off-chain assets).

Parameters:

Name Type Description Default
market MarketSnapshot

Current market snapshot with prices and balances

required

Returns:

Type Description
Decimal

Total portfolio value in USD as a Decimal

on_vault_settled

on_vault_settled(settlement: SettlementResult) -> None

Called after a vault settlement cycle completes.

Override this method to react to settlement results, e.g., to log deposit/redemption amounts or update internal state.

Parameters:

Name Type Description Default
settlement SettlementResult

SettlementResult with deposit/redemption data

required

set_multi_chain_providers

set_multi_chain_providers(
    price_oracle: MultiChainPriceOracle | None = None,
    balance_provider: MultiChainBalanceProvider
    | None = None,
    aave_health_factor_provider: AaveHealthFactorProvider
    | None = None,
) -> None

Set multi-chain data providers for cross-chain strategies.

Call this method before running a multi-chain strategy to enable MultiChainMarketSnapshot creation.

Parameters:

Name Type Description Default
price_oracle MultiChainPriceOracle | None

Multi-chain price oracle

None
balance_provider MultiChainBalanceProvider | None

Multi-chain balance provider

None
aave_health_factor_provider AaveHealthFactorProvider | None

Aave health factor provider

None

is_multi_chain

is_multi_chain() -> bool

Check if this strategy is running in multi-chain mode.

Returns True only when SUPPORTED_CHAINS is explicitly set (manually or by the CLI multi-chain path) AND has >1 chain. Does NOT use decorator metadata because that is portability info, not a runtime signal. The CLI's is_multi_chain_strategy() makes the runtime decision based on config.chains.

Returns:

Type Description
bool

True if SUPPORTED_CHAINS has multiple chains

get_supported_chains

get_supported_chains() -> list[str]

Get the chains supported by this strategy.

Returns SUPPORTED_CHAINS if explicitly set, otherwise falls back to STRATEGY_METADATA.supported_chains (decorator portability metadata), then to [self._chain].

Returns:

Type Description
list[str]

List of supported chain names

begin_market_snapshot_iteration

begin_market_snapshot_iteration(token: object) -> None

Open a new per-iteration MarketSnapshot scope (VIB-4843 FR-5001).

The runner calls this once at the top of each run_iteration with a token unique to the iteration (the cycle_id). Subsequent create_market_snapshot() calls within the same iteration — pre-warm, decide(), and post-decide portfolio valuation — return the SAME instance so the pre-warmed _price_cache is reused instead of thrown away. Passing a new token (or None) invalidates the memo.

Idempotent: re-stamping the current token is a no-op so the runner can call it defensively without dropping a warm snapshot mid-iteration.

create_market_snapshot

create_market_snapshot() -> MarketSnapshot

Return the per-iteration market snapshot, building one if needed.

VIB-4843 FR-5001: memoizes the snapshot per iteration so pre-warm → decide() → portfolio valuation all share ONE instance (and its _price_cache). Reuse is keyed by the iteration token the runner stamps via :meth:begin_market_snapshot_iteration.

Two invalidation regimes, never both:

  • Iteration-token stamped (the runner's per-iteration cycle_id): the token IS the lifetime. The memo survives for the WHOLE iteration — pre-warm → decide() → portfolio valuation — regardless of wall-clock, because :meth:begin_market_snapshot_iteration invalidates it the moment the next iteration stamps a new token. Applying a TTL here would drop the warm _price_cache mid-iteration on a slow decide() and defeat the dedup (the Codex VIB-4843 finding).
  • No token stamped (direct callers / tests): a short TTL bounds reuse so prices are never served stale across loops with no iteration scope.

Builds via :meth:_build_market_snapshot; see its docstring for the builder-contract details. Override _build_market_snapshot (not this method) to customize how market data is populated, so the memo still applies.

run

run() -> ActionBundle | None

Execute one iteration of the strategy.

This method: 1. Creates a MarketSnapshot 2. Calls decide() to get an intent or DecideResult 3. Compiles single intents to an ActionBundle 4. Returns the ActionBundle for execution

Note: For multi-intent results (list or IntentSequence), this method only compiles the first intent. Use run_multi() for full multi-intent execution with proper parallel/sequential handling.

Returns:

Type Description
ActionBundle | None

ActionBundle to execute, or None if HOLD intent or no action

run_multi

run_multi() -> DecideResult

Execute one iteration of the strategy, returning the full DecideResult.

Unlike run(), this method returns the full DecideResult from decide() without compiling to ActionBundle. This is useful for multi-chain execution via MultiChainOrchestrator.

Returns:

Name Type Description
DecideResult DecideResult

The raw result from decide() (may be None, single intent,

DecideResult

IntentSequence, or list of intents/sequences)

run_with_state_machine

run_with_state_machine(
    receipt_provider: Callable[
        [ActionBundle], TransactionReceipt
    ]
    | None = None,
) -> ExecutionResult

Execute strategy with full state machine lifecycle.

This method provides full state machine execution including: - Intent compilation - Transaction execution (via receipt_provider) - Validation - Retry logic on failure

Note: This method only handles single intents for backward compatibility. For multi-intent execution, use run_multi() with MultiChainOrchestrator.

Parameters:

Name Type Description Default
receipt_provider Callable[[ActionBundle], TransactionReceipt] | None

Function that executes an ActionBundle and returns a TransactionReceipt. If not provided, returns after compilation.

None

Returns:

Type Description
ExecutionResult

ExecutionResult with full execution details

apply_quote_asset_override

apply_quote_asset_override(
    raw: QuoteAsset | str | dict[str, Any] | None,
) -> None

Apply a per-deployment config.json quote_asset override at boot.

Called once by the runner/CLI after construction. None keeps the decorator default. Frozen thereafter — there is no hot-reload path that mutates it, by design.

get_metadata

get_metadata() -> StrategyMetadata | None

Get strategy metadata if available.

Returns:

Type Description
StrategyMetadata | None

StrategyMetadata if set via decorator, otherwise None

to_dict

to_dict() -> dict[str, Any]

Serialize strategy state to dictionary.

Returns:

Type Description
dict[str, Any]

Dictionary representation of strategy state

on_sadflow_enter

on_sadflow_enter(
    error_type: str | None,
    attempt: int,
    context: SadflowContext,
) -> SadflowAction | None

Hook called when entering sadflow state.

Override this method to customize sadflow behavior for your strategy. This is called once when first entering sadflow, before any retry attempts.

Parameters:

Name Type Description Default
error_type str | None

Categorized error type (e.g., "INSUFFICIENT_FUNDS", "TIMEOUT", "SLIPPAGE", "REVERT"). May be None for uncategorized errors.

required
attempt int

Current attempt number (1-indexed).

required
context SadflowContext

SadflowContext with error details and execution state.

required

Returns:

Type Description
SadflowAction | None

Optional[SadflowAction]: Action to take. Return None to use default

SadflowAction | None

retry behavior. Return SadflowAction to customize:

SadflowAction | None
  • SadflowAction.retry(): Continue with default retry
SadflowAction | None
  • SadflowAction.abort(reason): Stop immediately and fail
SadflowAction | None
  • SadflowAction.modify(bundle): Retry with modified ActionBundle
SadflowAction | None
  • SadflowAction.skip(reason): Skip intent and mark as completed
Example

def on_sadflow_enter(self, error_type, attempt, context): # Abort immediately on insufficient funds if error_type == "INSUFFICIENT_FUNDS": return SadflowAction.abort("Not enough funds for transaction")

# Increase gas for gas errors
if error_type == "GAS_ERROR" and context.action_bundle:
    modified = self._increase_gas(context.action_bundle)
    return SadflowAction.modify(modified, reason="Increased gas limit")

# Use default retry for other errors
return None

pause async

pause() -> None

Pause the strategy during teardown.

Called by TeardownManager before executing teardown intents. Default is a no-op; override if your strategy needs to stop background tasks or cancel pending orders before teardown.

get_portfolio_snapshot

get_portfolio_snapshot(
    market: MarketSnapshot | None = None,
) -> PortfolioSnapshot

Get current portfolio value and positions.

This method is called by the StrategyRunner after each iteration to capture portfolio snapshots for: - Dashboard value display (Total Value, PnL) - Historical PnL charts - Position breakdown by type

Default implementation: 1. Calls get_open_positions() for position values (LP, lending, perps) 2. Adds wallet token balances not captured by positions

Override for strategies needing custom value calculation (CEX, prediction).

Parameters:

Name Type Description Default
market MarketSnapshot | None

Optional MarketSnapshot. If None, creates one internally.

None

Returns:

Type Description
PortfolioSnapshot

PortfolioSnapshot with current values and confidence level.

PortfolioSnapshot

If value cannot be computed, returns snapshot with

PortfolioSnapshot

value_confidence=UNAVAILABLE instead of $0.

Example

def get_portfolio_snapshot(self, market=None) -> PortfolioSnapshot: if market is None: market = self.create_market_snapshot()

# Custom CEX balance fetch
cex_balance = self._fetch_cex_balance()

# VIB-3614: total_value_usd is positions-only; idle CEX cash is
# uninvested buying power → available_cash_usd. NAV is reconstructed
# downstream as total_value_usd + available_cash_usd, so putting
# cex_balance in BOTH would double-count it (cf. VIB-5271). Model a
# CEX holding as a PositionType.CEX position if it should count as
# deployed value instead of cash.
return PortfolioSnapshot(
    timestamp=datetime.now(UTC),
    deployment_id=self.deployment_id,
    total_value_usd=Decimal("0"),
    available_cash_usd=cex_balance,
    value_confidence=ValueConfidence.ESTIMATED,
    chain=self.chain,
)

get_open_positions abstractmethod

get_open_positions() -> TeardownPositionSummary

Get all open positions for this strategy.

MUST query on-chain state - do not use cached state for safety. Called during teardown preview and execution to determine what positions need to be closed.

For strategies with no positions, use StatelessStrategy as your base class, or return TeardownPositionSummary.empty(self.deployment_id).

Returns:

Type Description
TeardownPositionSummary

TeardownPositionSummary with all current positions

Example

from almanak.framework.teardown import TeardownPositionSummary, PositionInfo, PositionType

def get_open_positions(self) -> TeardownPositionSummary: positions = []

# Query on-chain LP position
lp_data = self._query_lp_position()
if lp_data:
    positions.append(PositionInfo(
        position_type=PositionType.LP,
        position_id=lp_data["token_id"],
        chain=self.chain,
        protocol="uniswap_v3",
        value_usd=Decimal(str(lp_data["value_usd"])),
    ))

return TeardownPositionSummary(
    deployment_id=self.deployment_id,
    timestamp=datetime.now(timezone.utc),
    positions=positions,
)

generate_teardown_intents abstractmethod

generate_teardown_intents(
    mode: TeardownMode, market: MarketSnapshot | None = None
) -> list[Intent]

Generate intents to close all positions.

Return intents in the correct execution order: 1. PERP - Close perpetuals first (highest liquidation risk) 2. BORROW - Repay borrowed amounts (frees collateral) 3. SUPPLY - Withdraw supplied collateral 4. LP - Close LP positions and collect fees 5. TOKEN - Swap all tokens to target token (USDC)

For strategies with no positions, use StatelessStrategy as your base class, or return an empty list.

Parameters:

Name Type Description Default
mode TeardownMode

TeardownMode.SOFT (graceful) or TeardownMode.HARD (emergency)

required
market MarketSnapshot | None

Optional market snapshot with real prices. When called from the runner, this is the same snapshot used for normal decide() iterations. May be None for backward compatibility or when called outside the runner.

None

Returns:

Type Description
list[Intent]

List of intents to execute in order

Example

from almanak.framework.teardown import TeardownMode

def generate_teardown_intents(self, mode: TeardownMode, market=None) -> list[Intent]: intents = []

# Get current positions
positions = self.get_open_positions()

# Use market data if available for smarter teardown
if market:
    eth_price = market.price("ETH")

# Close LP position first
for pos in positions.positions_by_type(PositionType.LP):
    intents.append(Intent.lp_close(
        position_id=pos.position_id,
        pool=pos.details.get("pool"),
        collect_fees=True,
        protocol="uniswap_v3",
    ))

# Swap remaining tokens to USDC
intents.append(Intent.swap(
    from_token="WETH",
    to_token="USDC",
    amount=Decimal("0"),  # All remaining
    swap_all=True,
))

return intents

on_teardown_started

on_teardown_started(mode: TeardownMode) -> None

Hook called when teardown starts.

Override to perform any setup before teardown begins. This is called after the cancel window expires.

Parameters:

Name Type Description Default
mode TeardownMode

The teardown mode (SOFT or HARD)

required
Example

def on_teardown_started(self, mode: TeardownMode) -> None: logger.info(f"Teardown starting in {mode.value} mode") self._pause_monitoring()

on_teardown_completed

on_teardown_completed(
    success: bool, recovered_usd: Decimal
) -> None

Hook called when teardown completes.

Override to perform cleanup after teardown.

Parameters:

Name Type Description Default
success bool

Whether all positions were closed successfully

required
recovered_usd Decimal

Total USD value recovered

required
Example

def on_teardown_completed(self, success: bool, recovered_usd: Decimal) -> None: if success: logger.info(f"Teardown complete. Recovered ${recovered_usd:,.2f}") else: logger.error("Teardown failed - manual intervention required")

get_teardown_profile

get_teardown_profile() -> TeardownProfile

Get teardown profile metadata for UX display.

Override to provide better information about teardown expectations. This helps the dashboard show more accurate previews.

Returns:

Type Description
TeardownProfile

TeardownProfile with strategy-specific metadata

Example

from almanak.framework.teardown import TeardownProfile

def get_teardown_profile(self) -> TeardownProfile: return TeardownProfile( natural_exit_assets=["WETH", "USDC"], original_entry_assets=["USDC"], recommended_target="USDC", estimated_steps=3, chains_involved=[self.chain], has_lp_positions=True, )

acknowledge_teardown_request

acknowledge_teardown_request() -> bool

Acknowledge a pending teardown request.

Called when the strategy picks up a teardown request and starts processing it. Local mode writes the strategy SQLite DB; hosted mode routes through the gateway.

Returns:

Type Description
bool

True if request was acknowledged, False otherwise

should_teardown

should_teardown() -> bool

Check if the strategy should enter teardown mode.

Checks for: 1. Pending teardown request (from CLI, dashboard, config) 2. Auto-protect triggers (health factor, loss limits)

Returns:

Type Description
bool

True if teardown should be initiated

on_sadflow_exit

on_sadflow_exit(success: bool, total_attempts: int) -> None

Hook called when exiting sadflow (on completion or final failure).

Override this method to perform cleanup or logging after sadflow resolution. This is called once when the intent completes (success or failure) after having been in sadflow.

Parameters:

Name Type Description Default
success bool

Whether the intent eventually succeeded after retries.

required
total_attempts int

Total number of attempts made (including the final one).

required
Example

def on_sadflow_exit(self, success, total_attempts): if success: logger.info(f"Recovered after {total_attempts} attempts") else: logger.error(f"Failed after {total_attempts} attempts") self.notify_operator("Intent failed after all retries")

on_retry

on_retry(
    context: SadflowContext, action: SadflowAction
) -> SadflowAction

Hook called before each retry attempt.

Override this method to customize individual retry behavior. This is called before each retry, after the initial on_sadflow_enter call.

Parameters:

Name Type Description Default
context SadflowContext

SadflowContext with current error details and state.

required
action SadflowAction

The default SadflowAction (RETRY with calculated delay).

required

Returns:

Name Type Description
SadflowAction SadflowAction

The action to take. Return the input action unchanged

SadflowAction

for default behavior, or return a modified action:

SadflowAction
  • SadflowAction.retry(custom_delay=5.0): Retry with custom delay
SadflowAction
  • SadflowAction.abort(reason): Stop retrying and fail
SadflowAction
  • SadflowAction.modify(bundle): Retry with modified ActionBundle
SadflowAction
  • SadflowAction.skip(reason): Skip and mark as completed
Example

def on_retry(self, context, action): # After 2 attempts, try with higher gas if context.attempt_number > 2 and context.action_bundle: modified = self._increase_gas(context.action_bundle) return SadflowAction.modify(modified)

# Abort if we've been retrying too long
if context.total_duration_seconds > 120:
    return SadflowAction.abort("Retry timeout exceeded")

# Use default retry
return action

StrategyBase

Lower-level base class for strategies that need direct action control.

almanak.framework.strategies.StrategyBase

StrategyBase(
    config: ConfigT,
    risk_guard_config: RiskGuardConfig | None = None,
    notification_callback: NotificationCallback
    | None = None,
)

Bases: ABC

Base class for all strategies with hot-reload configuration support.

This class provides: - Hot-reload configuration updates via update_config() - RiskGuard validation to prevent dangerous config changes - Atomic config application with rollback on failure - CONFIG_UPDATED event emission to timeline - Operator notification support

Strategies should inherit from this class and implement the abstract run() method.

Attributes:

Name Type Description
config

Hot-reloadable configuration

risk_guard

RiskGuard instance for validation

persistent_state dict[str, Any]

Dict containing strategy state including config snapshots

notification_callback dict[str, Any]

Optional callback for operator notifications

Initialize the strategy base.

Parameters:

Name Type Description Default
config ConfigT

Hot-reloadable configuration

required
risk_guard_config RiskGuardConfig | None

Optional RiskGuard configuration

None
notification_callback NotificationCallback | None

Optional callback for sending notifications

None

deployment_id property

deployment_id: str

Get the deployment ID.

chain property

chain: str

Get the chain.

allocation_usd property

allocation_usd: Decimal | None

Strategy's declared starting allocation in USD (VIB-3882).

The runner's portfolio-baseline path uses this property to set portfolio_metrics.initial_value_usd. When None, the runner falls back to the legacy "first observed wallet total" heuristic — preserved for strategies that haven't migrated to the new contract.

The default implementation reads config.total_value_usd for backwards compatibility (most demo / accounting strategies declare their allocation there). Strategies that source their allocation from elsewhere (a separate allocation.json, an env var, a hosted control plane) should override this property.

Why a property and not a class field: the contract is on the strategy, not on the JSON file. A strategy author who wants capital isolation from a shared test-wallet can declare it independently of how their config is shaped.

run abstractmethod

run() -> Any

Execute one iteration of the strategy.

Must be implemented by subclasses.

Returns:

Type Description
Any

ActionBundle or None if no action needed

update_config

update_config(
    updates: dict[str, Any], updated_by: str = "operator"
) -> ConfigUpdateResult

Update configuration with validation and persistence.

This method: 1. Validates new config values against the config's schema 2. Validates changes against RiskGuard (can't bypass risk limits) 3. Applies changes atomically 3a. Runs strategy-level validate_config() hook (if defined on the subclass); on failure, rolls back to previous values so live invariants cannot be bypassed via hot-reload or snapshot restore 4. Persists config snapshot to persistent_state 5. Emits CONFIG_UPDATED event with old and new values 6. Sends notification to operator

Parameters:

Name Type Description Default
updates dict[str, Any]

Dict of field -> new value to update

required
updated_by str

Who is making the update (for audit trail)

'operator'

Returns:

Type Description
ConfigUpdateResult

ConfigUpdateResult indicating success or failure

set_notification_callback

set_notification_callback(
    callback: NotificationCallback | None,
) -> None

Set the notification callback for operator alerts.

Parameters:

Name Type Description Default
callback NotificationCallback | None

Callback function that takes an OperatorCard

required

get_config_history

get_config_history() -> list[dict[str, Any]]

Get the configuration update history.

Returns:

Type Description
list[dict[str, Any]]

List of config snapshots in chronological order

get_current_config_version

get_current_config_version() -> int

Get the current config version number.

Returns:

Type Description
int

Current config version

get_config

get_config(key: str, default: Any = None) -> Any

Get a configuration value by key, regardless of config type.

Works transparently with: - Plain dict configs (from tests or raw JSON) - DictConfigWrapper objects (from the CLI) - Dataclasses and Pydantic models (attribute access)

This eliminates the per-strategy boilerplate of checking isinstance(self.config, dict) before every config.get() call.

Parameters:

Name Type Description Default
key str

Configuration key to retrieve

required
default Any

Value to return if the key is not found

None

Returns:

Type Description
Any

The configuration value, or default if not found

Example::

# In decide() -- no more boilerplate!
trade_size = self.get_config("trade_size_usd", "100")
rsi_period = self.get_config("rsi_period", 14)

restore_config_from_snapshot

restore_config_from_snapshot(
    version: int,
) -> ConfigUpdateResult

Restore configuration from a previous snapshot.

Parameters:

Name Type Description Default
version int

The config version to restore to

required

Returns:

Type Description
ConfigUpdateResult

ConfigUpdateResult indicating success or failure

RiskGuard

Non-bypassable risk validation that runs before every execution.

almanak.framework.strategies.RiskGuard

RiskGuard(config: RiskGuardConfig | None = None)

Validates configuration changes against risk limits.

RiskGuard ensures that configuration updates cannot bypass critical risk limits defined by the operator or system. It provides human-readable guidance when validation fails.

Initialize RiskGuard with configuration.

Parameters:

Name Type Description Default
config RiskGuardConfig | None

Risk guard configuration (uses defaults if not provided)

None

generate_guidance

generate_guidance(
    field_name: str,
    requested_value: Decimal,
    limit_value: Decimal,
) -> RiskGuardGuidance

Generate human-readable guidance for a failed risk check.

This method creates a RiskGuardGuidance object with: - What the limit is and what value was requested - Clear explanation of what the limit protects against - Actionable suggestion for how to proceed

Parameters:

Name Type Description Default
field_name str

The configuration field that failed validation

required
requested_value Decimal

The value the operator tried to set

required
limit_value Decimal

The maximum/minimum allowed value

required

Returns:

Type Description
RiskGuardGuidance

RiskGuardGuidance with human-readable explanation and suggestion

validate_config_update

validate_config_update(
    current_config: HotReloadableConfig,
    updates: dict[str, Any],
) -> RiskGuardResult

Validate proposed configuration updates against risk limits.

This method checks each proposed update against the risk guard's limits to ensure operators cannot accidentally configure dangerous parameters. When validation fails, it includes human-readable guidance explaining what went wrong and how to fix it.

Parameters:

Name Type Description Default
current_config HotReloadableConfig

Current configuration

required
updates dict[str, Any]

Dict of field -> new value proposed updates

required

Returns:

Type Description
RiskGuardResult

RiskGuardResult indicating if validation passed, with guidance if not

RiskGuardConfig

almanak.framework.strategies.RiskGuardConfig dataclass

RiskGuardConfig(
    max_slippage_limit: Decimal = Decimal("0.1"),
    max_leverage_limit: Decimal = Decimal("10"),
    max_daily_loss_limit_usd: Decimal = Decimal("100000"),
    min_health_factor_floor: Decimal = Decimal("1.05"),
)

Configuration for RiskGuard validation.

Defines the maximum allowed values for risk-sensitive parameters. These limits cannot be exceeded through hot-reload updates.

Attributes:

Name Type Description
max_slippage_limit Decimal

Maximum allowed slippage (e.g., 0.05 = 5%)

max_leverage_limit Decimal

Maximum allowed leverage (e.g., 10x)

max_daily_loss_limit_usd Decimal

Maximum allowed daily loss limit

min_health_factor_floor Decimal

Minimum allowed health factor setting

DecideResult

almanak.framework.strategies.DecideResult

DecideResult = (
    AnyIntent
    | IntentSequence
    | list[AnyIntent | IntentSequence]
    | None
)

IntentSequence

almanak.framework.strategies.IntentSequence dataclass

IntentSequence(
    intents: list[AnyIntent],
    sequence_id: str = (lambda: str(uuid.uuid4()))(),
    created_at: datetime = (lambda: datetime.now(UTC))(),
    description: str | None = None,
)

A sequence of intents that must execute in order (dependent actions).

IntentSequence wraps a list of intents that have dependencies between them and must execute sequentially. This is used when the output of one intent feeds into the input of the next (e.g., swap output -> bridge input).

Intents that are NOT in a sequence can execute in parallel if they are independent (e.g., two swaps on different chains).

Attributes:

Name Type Description
intents list[AnyIntent]

List of intents to execute in order

sequence_id str

Unique identifier for this sequence

created_at datetime

Timestamp when the sequence was created

description str | None

Optional description of the sequence purpose

Example

Create a sequence of dependent actions

sequence = Intent.sequence([ Intent.swap("USDC", "ETH", amount=Decimal("1000"), chain="base"), Intent.bridge(token="ETH", amount="all", from_chain="base", to_chain="arbitrum"), Intent.supply(protocol="aave_v3", token="WETH", amount="all", chain="arbitrum"), ])

Return from decide() - will execute sequentially

return sequence

first property

first: AnyIntent

Get the first intent in the sequence.

last property

last: AnyIntent

Get the last intent in the sequence.

__post_init__

__post_init__() -> None

Validate the sequence.

__len__

__len__() -> int

Return the number of intents in the sequence.

__iter__

__iter__()

Iterate over intents in the sequence.

__getitem__

__getitem__(index: int) -> AnyIntent

Get intent at index.

serialize

serialize() -> dict[str, Any]

Serialize the sequence to a dictionary.

deserialize classmethod

deserialize(data: dict[str, Any]) -> IntentSequence

Deserialize a dictionary to an IntentSequence.

Note: This requires the Intent.deserialize function to be available, which creates a circular dependency. The actual deserialization is done in the Intent class.

ExecutionResult

almanak.framework.strategies.ExecutionResult dataclass

ExecutionResult(
    intent: AnyIntent | None,
    action_bundle: ActionBundle | None = None,
    state_machine_result: StepResult | None = None,
    success: bool = False,
    error: str | None = None,
    execution_time_ms: float = 0.0,
)

Result of strategy execution.

Attributes:

Name Type Description
intent AnyIntent | None

The intent that was executed (or None if HOLD)

action_bundle ActionBundle | None

The compiled action bundle (or None)

state_machine_result StepResult | None

Final state machine step result

success bool

Whether execution was successful

error str | None

Error message if failed

execution_time_ms float

Time taken for execution in milliseconds

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary.

Market Data Types

Data types returned by MarketSnapshot getters and accepted by set_* methods for unit testing.

TokenBalance

almanak.framework.market.TokenBalance dataclass

TokenBalance(
    symbol: str,
    balance: Decimal,
    balance_usd: Decimal,
    address: str = "",
)

Balance information for a single token.

Supports numeric comparisons so strategy authors can write

if market.balance("ETH") > Decimal("1.0"): ... amount = min(trade_size, market.balance("USDC"))

Comparisons delegate to the balance field (native units).

PriceData

almanak.framework.market.PriceData dataclass

PriceData(
    price: Decimal,
    price_24h_ago: Decimal = Decimal("0"),
    change_24h_pct: Decimal = Decimal("0"),
    high_24h: Decimal = Decimal("0"),
    low_24h: Decimal = Decimal("0"),
    timestamp: datetime = (lambda: datetime.now(UTC))(),
    source: str = "",
)

Price data for a token.

Includes source (the named provider that produced the datum) so accounting writers can stamp transaction_ledger.price_inputs_json with the real provider name (VIB-3889).

RSIData

almanak.framework.market.RSIData dataclass

RSIData(
    value: Decimal,
    period: int = 14,
    overbought: Decimal = Decimal("70"),
    oversold: Decimal = Decimal("30"),
)

RSI (Relative Strength Index) data for a token.

Supports numeric operations so strategy authors can write

rsi = market.rsi("ETH") if rsi > 70: ... round(rsi, 2) f"{rsi:.2f}"

MACDData

almanak.framework.market.MACDData dataclass

MACDData(
    macd_line: Decimal,
    signal_line: Decimal,
    histogram: Decimal,
    fast_period: int = 12,
    slow_period: int = 26,
    signal_period: int = 9,
)

MACD data for a token.

BollingerBandsData

almanak.framework.market.BollingerBandsData dataclass

BollingerBandsData(
    upper_band: Decimal,
    middle_band: Decimal,
    lower_band: Decimal,
    bandwidth: Decimal = Decimal("0"),
    percent_b: Decimal = Decimal("0.5"),
    period: int = 20,
    std_dev: float = 2.0,
)

Bollinger Bands data for a token.

StochasticData

almanak.framework.market.StochasticData dataclass

StochasticData(
    k_value: Decimal,
    d_value: Decimal,
    k_period: int = 14,
    d_period: int = 3,
    overbought: Decimal = Decimal("80"),
    oversold: Decimal = Decimal("20"),
)

Stochastic Oscillator data for a token.

ATRData

almanak.framework.market.ATRData dataclass

ATRData(
    value: Decimal,
    value_percent: Decimal = Decimal("0"),
    period: int = 14,
    volatility_threshold: Decimal = Decimal("5.0"),
)

ATR (Average True Range) data for a token.

MAData

almanak.framework.market.MAData dataclass

MAData(
    value: Decimal,
    ma_type: str = "SMA",
    period: int = 20,
    current_price: Decimal = Decimal("0"),
)

Moving Average data for a token.

ADXData

almanak.framework.market.ADXData dataclass

ADXData(
    adx: Decimal,
    plus_di: Decimal,
    minus_di: Decimal,
    period: int = 14,
    trend_threshold: Decimal = Decimal("25"),
)

ADX (Average Directional Index) data for a token.

OBVData

almanak.framework.market.OBVData dataclass

OBVData(
    obv: Decimal,
    signal_line: Decimal,
    signal_period: int = 21,
)

OBV (On-Balance Volume) data for a token.

CCIData

almanak.framework.market.CCIData dataclass

CCIData(
    value: Decimal,
    period: int = 20,
    upper_level: Decimal = Decimal("100"),
    lower_level: Decimal = Decimal("-100"),
)

CCI (Commodity Channel Index) data for a token.

IchimokuData

almanak.framework.market.IchimokuData dataclass

IchimokuData(
    tenkan_sen: Decimal,
    kijun_sen: Decimal,
    senkou_span_a: Decimal,
    senkou_span_b: Decimal,
    current_price: Decimal = Decimal("0"),
    tenkan_period: int = 9,
    kijun_period: int = 26,
    senkou_b_period: int = 52,
)

Ichimoku Cloud data for a token.

ChainHealthStatus

almanak.framework.strategies.ChainHealthStatus

Bases: Enum

Status of a chain's data health.

Attributes:

Name Type Description
HEALTHY

Chain data is fresh and available

DEGRADED

Chain data is stale but still usable (between threshold and 2x threshold)

UNAVAILABLE

Chain data could not be fetched

STALE

Chain data is too old to be trusted (beyond staleness threshold)

ChainHealth

almanak.framework.strategies.ChainHealth dataclass

ChainHealth(
    chain: str,
    status: ChainHealthStatus,
    last_updated: datetime | None = None,
    staleness_seconds: float | None = None,
    stale_threshold_seconds: float = 30.0,
    error: str | None = None,
)

Health status and staleness information for a single chain.

This dataclass provides detailed information about the health of market data for a specific chain, including when data was last fetched, staleness metrics, and any error information.

Attributes:

Name Type Description
chain str

Chain name (e.g., "arbitrum", "optimism")

status ChainHealthStatus

Current health status of the chain

last_updated datetime | None

When the chain's data was last successfully fetched

staleness_seconds float | None

How old the data is in seconds (None if unavailable)

stale_threshold_seconds float

The threshold used to determine staleness

error str | None

Error message if data fetch failed

is_stale bool

Whether the data is considered stale

is_available bool

Whether the data is available for use

Example

health = ChainHealth( chain="arbitrum", status=ChainHealthStatus.HEALTHY, last_updated=datetime.now(timezone.utc), staleness_seconds=5.2, stale_threshold_seconds=30.0, )

if health.is_stale: logger.warning(f"Chain {health.chain} data is stale")

is_stale property

is_stale: bool

Check if the chain data is stale.

Returns:

Type Description
bool

True if staleness exceeds threshold or data is unavailable

is_available property

is_available: bool

Check if the chain data is available for use.

Returns:

Type Description
bool

True if data is healthy or degraded (but still usable)

is_healthy property

is_healthy: bool

Check if the chain data is fully healthy.

Returns:

Type Description
bool

True if status is HEALTHY

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary representation.

Returns:

Type Description
dict[str, Any]

Dictionary with health information

from_dict classmethod

from_dict(data: dict[str, Any]) -> ChainHealth

Create ChainHealth from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with health information

required

Returns:

Type Description
ChainHealth

ChainHealth instance