Strategies¶
Strategy base classes and the market snapshot interface.
Implementing Teardown¶
Every strategy must implement three teardown methods so operators can safely close positions. Without them, close-requests are silently ignored.
Required Methods¶
| Method | Purpose |
|---|---|
supports_teardown() -> bool |
Return True to enable teardown |
get_open_positions() |
Return a TeardownPositionSummary listing all open positions |
generate_teardown_intents(mode, market) |
Return an ordered list of Intent objects that unwind positions |
Execution Order¶
If your strategy holds multiple position types, teardown intents must follow this order:
- PERP -- close perpetual positions first (highest risk)
- BORROW -- repay borrows to free collateral
- SUPPLY -- withdraw supplied collateral
- LP -- close liquidity positions
- TOKEN -- swap remaining tokens to stable
Example: Swap Strategy Teardown¶
from decimal import Decimal
from almanak import IntentStrategy, Intent
class MyStrategy(IntentStrategy):
def supports_teardown(self) -> bool:
return True
def get_open_positions(self) -> "TeardownPositionSummary":
from datetime import UTC, datetime
from almanak.framework.teardown import (
PositionInfo, PositionType, TeardownPositionSummary,
)
return TeardownPositionSummary(
deployment_id=getattr(self, "deployment_id", "my_strategy"),
timestamp=datetime.now(UTC),
positions=[
PositionInfo(
position_type=PositionType.TOKEN,
position_id="my_strategy_eth",
chain=self.chain,
protocol="uniswap_v3",
value_usd=Decimal("1000"), # query on-chain balance, not cache
details={"asset": "WETH"},
)
],
)
def generate_teardown_intents(self, mode: "TeardownMode", market=None) -> list[Intent]:
from almanak.framework.teardown import TeardownMode
max_slippage = Decimal("0.03") if mode == TeardownMode.HARD else Decimal("0.005")
return [
Intent.swap(
from_token="WETH", to_token="USDC",
amount="all", max_slippage=max_slippage, protocol="uniswap_v3",
)
]
Always query on-chain state
get_open_positions() must query live on-chain balances, not cached values. Stale data can cause teardown to skip positions or attempt to close positions that no longer exist.
Testing with force_action¶
Real strategies gate their decide() output behind signals — RSI thresholds, MACD crosses, balance checks, cooldowns. That makes them hard to test on a forked block where those signals may not fire. The convention is a force_action config field that bypasses signal gates and lets you drive a known intent through the production code path.
@dataclass
class MyStrategyConfig:
# Production tuning
rsi_oversold: int = 30
rsi_overbought: int = 70
trade_size_usd: Decimal = Decimal("100")
# Testing affordance — empty in production
force_action: str = ""
class MyStrategy(IntentStrategy):
def __init__(self, config, ...):
super().__init__(...)
self.force_action = str(self.get_config("force_action", "") or "").lower()
# ... other config fields ...
def decide(self, market: MarketSnapshot) -> Intent:
# Test affordance: short-circuit BEFORE any signal logic.
if self.force_action:
return self._forced_intent(market)
# Production decide() begins here.
rsi = market.indicators.rsi(self.base_token, ...)
if rsi.value < self.rsi_oversold:
return Intent.swap(...)
return Intent.hold(reason="No signal")
def _forced_intent(self, market: MarketSnapshot) -> Intent:
"""Map free-form force_action strings to concrete Intents.
Values are strategy-specific. Pick names that match what each branch
DOES (verbs: "buy", "sell", "open", "close", "supply"), not internal
state machine values.
"""
if self.force_action == "buy":
return Intent.swap(
from_token=self.quote_token,
to_token=self.base_token,
amount_usd=self.trade_size_usd,
protocol=self.protocol,
chain=self.chain,
)
if self.force_action == "sell":
return Intent.swap(
from_token=self.base_token,
to_token=self.quote_token,
amount="all",
protocol=self.protocol,
chain=self.chain,
)
raise ValueError(f"Unknown force_action: {self.force_action!r}")
Driving force_action from the CLI¶
The almanak strat test command mutates force_action between iterations and runs each through the production code path on a managed Anvil fork:
# Single forced action
almanak strat test --actions buy --teardown --json
# Multiple actions in sequence (one iteration each)
almanak strat test --actions open,collect_fees --teardown --json
# Teardown only (no force_actions)
almanak strat test --teardown --json
The CLI emits structured JSON: every step's status, failure_logs, and a summary.all_passed bool. See CLI Reference: strat test for full flag docs.
Authoring rules¶
| Rule | Why |
|---|---|
Values are free-form strings, strategy-specific. Pick names that describe the action verb ("buy", "open", "supply", "close"), not state-machine internals ("phase_2", "step_3a"). |
Anyone reading your code (or grepping _forced_intent to find supported values) should be able to map name → intent shape without context. |
One value per non-HOLD intent type. If your strategy emits both Intent.lp_open and Intent.swap, expose both as force_action values (e.g. "open" and "buy"). |
Each path needs to be drivable independently. Missing branches mean missing test coverage. |
Each branch must produce a non-HOLD Intent. If a forced branch returns Intent.hold(...), the test passes silently without exercising the path you wanted. |
The whole point of force_action is to bypass gates — returning HOLD defeats it. |
Order matters; document prerequisites. force_action="close" requires an open position; force_action="collect_fees" requires an LP NFT. |
The CLI runs values in --actions order against one Anvil instance, so prereqs must be driven first. |
Skip values that overlap with generate_teardown_intents(). If teardown emits an unwind swap, don't drive the same intent via --actions — the position will be closed before teardown runs. |
--teardown is the canonical unwind path; double-driving it leaves teardown nothing to do. |
Empty default in config (force_action: str = ""). Production deploys must run signal-gated logic; force_action is testing-only. |
A non-empty default would bypass signals in production and execute on every iteration. |
Example: LP strategy with three force_action values¶
def _forced_intent(self, market: MarketSnapshot) -> Intent:
if self.force_action == "open":
# Opens an LP position.
return Intent.lp_open(
pool=self.pool,
amount0=self.amount0,
amount1=self.amount1,
range_lower=self._compute_lower(market),
range_upper=self._compute_upper(market),
protocol=self.protocol,
chain=self.chain,
)
if self.force_action == "collect_fees":
# Collects fees on an existing position. Drive AFTER "open".
# NFT-based protocols (Uniswap V3 family, Uniswap V4, Aerodrome
# Slipstream) require the position's tokenId via protocol_params.
return Intent.collect_fees(
pool=self.pool,
protocol=self.protocol,
chain=self.chain,
protocol_params={
"position_id": self.force_position_id or self._position_id,
},
)
if self.force_action == "close":
# Closes the position. SKIP this in --actions; teardown emits the same intent.
return Intent.lp_close(
pool=self.pool,
position_id=self.force_position_id or self._position_id,
protocol=self.protocol,
chain=self.chain,
)
raise ValueError(f"Unknown force_action: {self.force_action!r}")
CLI to test the lifecycle without double-closing:
almanak strat test --actions open,collect_fees --teardown --json
# `close` is skipped from --actions because teardown emits Intent.lp_close
Companion override fields¶
Strategies that track on-chain identifiers (NFT IDs, vault shares) often need a paired override so a forced action can target an existing position rather than the strategy's cached state:
@dataclass
class MyLPConfig:
force_action: str = ""
force_position_id: str | None = None # override the cached self._position_id
# In _forced_intent:
position_id = self.force_position_id or self._position_id
Only add fields like this when a forced branch genuinely needs them. A swap-only strategy doesn't track position IDs, so it shouldn't carry force_position_id.
Common mistakes¶
- Returning
Intent.hold()from a forced branch — the test passes vacuously, you haven't tested anything. - Using state-machine values as force_action names (
"phase_b","step_3") — opaque to anyone trying to figure out what each value does. - Forgetting to short-circuit at the top of
decide()— if signal gates run before the force_action check, the gate's HOLD return value wins and the forced intent never fires. - Reusing values that teardown emits — running
--actions close --teardownunwinds the position twice (or once and then has nothing to test on teardown). - Setting
force_actionin the production config — bypasses every safety gate in deployment. Always default to"".
State Persistence¶
Strategies that track internal state across iterations (position IDs, phase tracking, trade counters) must implement two hooks so that state survives restarts.
The framework persists runner-level metadata automatically (iteration counts, consecutive errors, execution progress for multi-step intents). But strategy-specific state is opt-in -- without these hooks, instance variables are lost when the process stops.
Required Hooks¶
| Method | Called | Purpose |
|---|---|---|
get_persistent_state() -> dict |
After each iteration | Return a dict of state to save |
load_persistent_state(state: dict) |
On startup / resume | Restore state from the saved dict |
Example: LP Strategy with Position Tracking¶
from decimal import Decimal
from typing import Any
from almanak import IntentStrategy, Intent, MarketSnapshot
class MyLPStrategy(IntentStrategy):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._position_id: int | None = None
self._has_position: bool = False
self._total_fees_collected: Decimal = Decimal("0")
def decide(self, market: MarketSnapshot) -> Intent | None:
if self._has_position:
# Uniswap V3 / V4 / Aerodrome Slipstream identify the position by
# NFT tokenId — pass it through protocol_params, not as a
# top-level argument.
return Intent.collect_fees(
pool="WETH/USDC/3000",
protocol="uniswap_v3",
protocol_params={"position_id": self._position_id},
)
return Intent.lp_open(
pool="WETH/USDC/3000",
amount0=Decimal("0.5"),
amount1=Decimal("1000"),
range_lower=Decimal("1800"),
range_upper=Decimal("2200"),
protocol="uniswap_v3",
)
def on_intent_executed(self, intent, success: bool, result):
"""Update state after execution -- persisted via get_persistent_state()."""
if success and result.position_id:
self._position_id = result.position_id
self._has_position = True
# -- State persistence hooks --
def get_persistent_state(self) -> dict[str, Any]:
return {
"position_id": self._position_id,
"has_position": self._has_position,
"total_fees_collected": str(self._total_fees_collected),
}
def load_persistent_state(self, state: dict[str, Any]) -> None:
self._position_id = state.get("position_id")
self._has_position = state.get("has_position", False)
self._total_fees_collected = Decimal(state.get("total_fees_collected", "0"))
What the Framework Persists Automatically¶
These are saved by the runner without any strategy-author code:
| State | Persisted to | Survives restart |
|---|---|---|
| Iteration count, success count, error count | strategy_state table |
Yes |
| Multi-step execution progress (which step completed, serialized intents) | strategy_state.execution_progress |
Yes -- runner resumes from the failed step |
| Operator pause flag | strategy_state.is_paused |
Yes |
| Portfolio snapshots (total value, positions) | portfolio_snapshots table |
Yes |
| Portfolio PnL baseline (initial value, deposits, gas) | portfolio_metrics table |
Yes |
| Timeline events (execution audit trail) | timeline_events table |
Yes |
Guidelines¶
- Use defensive
.get()with defaults inload_persistent_state()so older saved state doesn't crash when you add new fields. - Store
Decimalas strings (str(amount)) and parse back (Decimal(state["amount"])) for safe JSON round-tripping. on_intent_executed()is the natural place to update state after a trade (e.g., storing a position ID).get_persistent_state()then picks it up for the next save.- All values must be JSON-serializable. The state dict is stored as a JSON blob in the database.
Without these hooks, strategy state is lost on restart
If you store state in instance variables but don't implement the persistence hooks, a restart means your strategy has no memory of open positions, completed trades, or internal phase. This is especially dangerous for LP and lending strategies where losing a position ID means the strategy cannot close its own positions.
IntentStrategy¶
The primary base class for writing strategies. Implement the decide() method to return an Intent.
almanak.framework.strategies.IntentStrategy
¶
IntentStrategy(
config: ConfigT,
chain: str,
wallet_address: str,
risk_guard_config: RiskGuardConfig | None = None,
notification_callback: NotificationCallback
| None = None,
compiler: IntentCompiler | None = None,
state_machine_config: StateMachineConfig | None = None,
price_oracle: PriceOracle | None = None,
rsi_provider: RSIProvider | None = None,
balance_provider: BalanceProvider | None = None,
rpc_url: str | None = None,
wallet_activity_provider: WalletActivityProvider
| None = None,
chains: list[str] | None = None,
chain_wallets: dict[str, str] | None = None,
)
Bases: StrategyBase[ConfigT]
Base class for Intent-based strategies.
IntentStrategy simplifies strategy development by allowing developers to write just a decide() method that returns an Intent. The framework handles:
- Market data access via MarketSnapshot
- Intent compilation to ActionBundle
- State machine generation for execution
- Hot-reloadable configuration
- Error handling and retries
Subclasses must implement the abstract decide() method.
Example
@almanak_strategy(name="simple_strategy") class SimpleStrategy(IntentStrategy): def decide(self, market: MarketSnapshot) -> Optional[Intent]: if market.rsi("ETH").is_oversold: return Intent.swap("USDC", "ETH", amount_usd=Decimal("100")) return Intent.hold()
Attributes:
| Name | Type | Description |
|---|---|---|
compiler |
IntentCompiler
|
IntentCompiler for converting intents to action bundles |
state_machine_config |
Configuration for state machine execution |
|
_current_intent |
AnyIntent | None
|
Currently executing intent (if any) |
_current_state_machine |
IntentStateMachine | None
|
Current state machine (if any) |
Initialize the intent strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConfigT
|
Hot-reloadable configuration |
required |
chain
|
str
|
Chain to operate on (e.g., "arbitrum") |
required |
wallet_address
|
str
|
Wallet address for transactions |
required |
risk_guard_config
|
RiskGuardConfig | None
|
Risk guard configuration |
None
|
notification_callback
|
NotificationCallback | None
|
Callback for operator notifications |
None
|
compiler
|
IntentCompiler | None
|
Intent compiler (required for direct run() calls, optional for runner) |
None
|
state_machine_config
|
StateMachineConfig | None
|
State machine configuration |
None
|
price_oracle
|
PriceOracle | None
|
Function to fetch prices |
None
|
rsi_provider
|
RSIProvider | None
|
Function to calculate RSI (token, period[, timeframe=]) -> RSIData |
None
|
balance_provider
|
BalanceProvider | None
|
Function to fetch balances |
None
|
rpc_url
|
str | None
|
RPC URL for on-chain queries (needed for LP close) |
None
|
wallet_activity_provider
|
WalletActivityProvider | None
|
Provider for leader wallet activity signals |
None
|
chains
|
list[str] | None
|
List of all chains this strategy operates on (multi-chain) |
None
|
chain_wallets
|
dict[str, str] | None
|
Per-chain wallet addresses from wallet registry |
None
|
compiler
property
writable
¶
Get the intent compiler.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If compiler was not provided and is accessed directly. The StrategyRunner creates its own compiler with real prices, so this is only needed for direct run() calls. |
current_state_machine
property
¶
Get the current state machine.
lp_position_tracker
property
¶
Read-only accessor to the framework's LP position tracker.
Exposed for tests and tooling. Strategies should not need to touch this directly — the runner manages it transparently.
quote_asset
property
¶
The resolved performance quote asset (decorator default or boot override).
Definition-only: exposed as metadata for the hosted platform. The SDK does not change valuation/accounting/CLI behaviour based on it, and it is frozen after boot (not part of the hot-reloadable config surface).
get_wallet_for_chain
¶
Get the wallet address for a specific chain.
If a wallet registry provided per-chain wallets, returns the chain-specific wallet. Otherwise falls back to the default wallet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str
|
Chain name (e.g., "arbitrum", "base") |
required |
Returns:
| Type | Description |
|---|---|
str
|
Wallet address for the specified chain |
validate_config
¶
Validate the strategy's configuration.
Lifecycle hook invoked automatically from :py:meth:__init__ AFTER the
config has been loaded (via super().__init__) and BEFORE any other
setup that depends on config (chain wiring, providers, state machine,
etc.).
Subclasses override this method to enforce preconditions on their
configuration — required fields, value ranges, cross-field invariants,
or any other invariant that must hold before the strategy is usable.
On failure, raise :py:class:ConfigValidationError with a clear
message and the offending field when applicable.
This hook exists so tooling like the Portfolio Manager's strat check
preflight can catch misconfigurations at construction time rather than
at the first decide() call in production.
The default implementation is a no-op, so existing strategies require no changes.
Raises:
| Type | Description |
|---|---|
ConfigValidationError
|
If the configuration is invalid. The error's
|
Example
from decimal import Decimal from almanak.framework.strategies.exceptions import ConfigValidationError
class MyStrategy(IntentStrategy): def validate_config(self) -> None: # NOTE: configs loaded from JSON / env come back as strings. # Always coerce numerics through Decimal(str(...)) so that # comparisons are numeric, not lexicographic (e.g. "9" >= "10" # is True as strings but False as numbers). size = Decimal(str(self.get_config("trade_size_usd", "0"))) if size <= 0: raise ConfigValidationError( "trade_size_usd must be > 0", field="trade_size_usd", ) oversold = Decimal(str(self.get_config("rsi_oversold", "30"))) overbought = Decimal(str(self.get_config("rsi_overbought", "70"))) if oversold >= overbought: raise ConfigValidationError( "rsi_oversold must be < rsi_overbought", field="rsi_oversold", )
set_state_manager
¶
Set the state manager for persistence.
Called by the runner to inject the state manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_manager
|
Any
|
StateManager instance |
required |
deployment_id
|
str
|
Unique ID for this strategy instance |
required |
get_persistent_state
¶
Get strategy state to persist.
Override this method to define what state should be persisted. Default implementation returns empty dict (no state).
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict of state key-value pairs to persist |
load_persistent_state
¶
Load persisted state into the strategy.
Override this method to restore state from persistence. Default implementation does nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
dict[str, Any]
|
Dict of state key-value pairs loaded from storage |
required |
reconcile_resumed_state
¶
Reconcile resumed in-memory side-state against live on-chain truth.
Optional post-resume guardrail hook (VIB-5155 / ALM-2719). After the
runner restores persisted state via load_persistent_state it calls
this hook ONCE, before the first decide(), with a live market
snapshot. A strategy that caches a position-side flag (e.g. "holding
base token") can use this to re-derive that flag from
market.balance(...) so a stale / desynced flag cannot HOLD-lock a
valid risk-off exit.
The default implementation is a no-op — strategies with no cached side-state need not override it.
Contract
- This is a guardrail, not a control-flow gate. The runner runs it
warn-only and inside a try/except: the return value never gates
whether the strategy runs, and exceptions are swallowed. State
mutations the hook makes (e.g. correcting a cached flag) are
strategy-owned and DO affect the subsequent
decide()/ teardown — that is the intended reconciliation behavior. - Live balance is truth; the persisted flag is only a hint.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
MarketSnapshot
|
A fresh market snapshot for the current cycle. |
required |
Returns:
| Type | Description |
|---|---|
bool | None
|
|
bool | None
|
corrected (the runner logs a WARNING + emits a |
bool | None
|
event); |
bool | None
|
|
bool | None
|
side-state. |
save_state
¶
Save current strategy state to persistence.
Called by runner after each iteration.
flush_pending_saves
async
¶
Wait for any pending save operations to complete.
This should be called before disconnecting from the gateway to ensure all state saves have completed. Handles both successful completion and errors gracefully.
load_state
¶
Load strategy state from persistence.
Called by runner on startup.
Returns:
| Type | Description |
|---|---|
bool
|
True if state was found and loaded, False otherwise |
load_state_async
async
¶
Async variant of load_state() -- preferred when already in an event loop.
Called by the CLI runner inside its async setup so that state is always restored correctly, regardless of whether a loop is already running.
decide
abstractmethod
¶
Decide what action to take based on current market conditions.
This is the main method that strategy developers need to implement. It receives a MarketSnapshot with current market data and should return an Intent, IntentSequence, list of intents, or None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
MarketSnapshot
|
Current market snapshot with prices, balances, RSI, etc. |
required |
Returns:
| Type | Description |
|---|---|
DecideResult
|
One of: |
DecideResult
|
|
DecideResult
|
|
DecideResult
|
|
DecideResult
|
|
DecideResult
|
Returning None is equivalent to returning Intent.hold(). |
Example
def decide(self, market: MarketSnapshot) -> DecideResult: # Single intent if market.rsi("ETH").is_oversold: return Intent.swap("USDC", "ETH", amount_usd=Decimal("1000"))
# Sequence of dependent actions (execute in order)
if should_move_funds:
return Intent.sequence([
Intent.swap("USDC", "ETH", amount=Decimal("1000"), chain="base"),
Intent.supply(protocol="aave_v3", token="WETH", amount=Decimal("0.5"), chain="arbitrum"),
])
# Multiple independent actions (execute in parallel)
if should_rebalance:
return [
Intent.swap("USDC", "ETH", amount=Decimal("500"), chain="arbitrum"),
Intent.swap("USDC", "ETH", amount=Decimal("500"), chain="optimism"),
]
# No action
return Intent.hold(reason="RSI in neutral zone")
on_intent_executed
¶
Called after each intent execution completes.
Override this method to react to execution results, e.g., to track position IDs, log swap amounts, or update state based on results.
The result object is enriched by the framework with extracted data that "just appears" based on intent type: - SWAP: result.swap_amounts (SwapAmounts) - LP_OPEN: result.position_id, result.extracted_data["liquidity"] - LP_CLOSE: result.lp_close_data (LPCloseData) - PERP_OPEN: result.extracted_data["entry_price"], ["leverage"]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
intent
|
Any
|
The intent that was executed |
required |
success
|
bool
|
Whether execution succeeded |
required |
result
|
Any
|
ExecutionResult with enriched data |
required |
valuate
¶
Calculate the total portfolio value in USD for vault settlement.
Called by the framework during vault settlement to determine the current value of the strategy's holdings. The returned value is converted to underlying token units and proposed as the new totalAssets for the vault.
The default implementation sums balance_usd for all known token balances in the market snapshot. Override this method for custom valuation logic (e.g., including LP positions, pending rewards, or off-chain assets).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
MarketSnapshot
|
Current market snapshot with prices and balances |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Total portfolio value in USD as a Decimal |
on_vault_settled
¶
Called after a vault settlement cycle completes.
Override this method to react to settlement results, e.g., to log deposit/redemption amounts or update internal state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
settlement
|
SettlementResult
|
SettlementResult with deposit/redemption data |
required |
set_multi_chain_providers
¶
set_multi_chain_providers(
price_oracle: MultiChainPriceOracle | None = None,
balance_provider: MultiChainBalanceProvider
| None = None,
aave_health_factor_provider: AaveHealthFactorProvider
| None = None,
) -> None
Set multi-chain data providers for cross-chain strategies.
Call this method before running a multi-chain strategy to enable MultiChainMarketSnapshot creation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
price_oracle
|
MultiChainPriceOracle | None
|
Multi-chain price oracle |
None
|
balance_provider
|
MultiChainBalanceProvider | None
|
Multi-chain balance provider |
None
|
aave_health_factor_provider
|
AaveHealthFactorProvider | None
|
Aave health factor provider |
None
|
is_multi_chain
¶
Check if this strategy is running in multi-chain mode.
Returns True only when SUPPORTED_CHAINS is explicitly set (manually or by the CLI multi-chain path) AND has >1 chain. Does NOT use decorator metadata because that is portability info, not a runtime signal. The CLI's is_multi_chain_strategy() makes the runtime decision based on config.chains.
Returns:
| Type | Description |
|---|---|
bool
|
True if SUPPORTED_CHAINS has multiple chains |
get_supported_chains
¶
Get the chains supported by this strategy.
Returns SUPPORTED_CHAINS if explicitly set, otherwise falls back to STRATEGY_METADATA.supported_chains (decorator portability metadata), then to [self._chain].
Returns:
| Type | Description |
|---|---|
list[str]
|
List of supported chain names |
begin_market_snapshot_iteration
¶
Open a new per-iteration MarketSnapshot scope (VIB-4843 FR-5001).
The runner calls this once at the top of each run_iteration with a
token unique to the iteration (the cycle_id). Subsequent
create_market_snapshot() calls within the same iteration — pre-warm,
decide(), and post-decide portfolio valuation — return the SAME
instance so the pre-warmed _price_cache is reused instead of thrown
away. Passing a new token (or None) invalidates the memo.
Idempotent: re-stamping the current token is a no-op so the runner can call it defensively without dropping a warm snapshot mid-iteration.
create_market_snapshot
¶
Return the per-iteration market snapshot, building one if needed.
VIB-4843 FR-5001: memoizes the snapshot per iteration so pre-warm →
decide() → portfolio valuation all share ONE instance (and its
_price_cache). Reuse is keyed by the iteration token the runner
stamps via :meth:begin_market_snapshot_iteration.
Two invalidation regimes, never both:
- Iteration-token stamped (the runner's per-iteration
cycle_id): the token IS the lifetime. The memo survives for the WHOLE iteration — pre-warm → decide() → portfolio valuation — regardless of wall-clock, because :meth:begin_market_snapshot_iterationinvalidates it the moment the next iteration stamps a new token. Applying a TTL here would drop the warm_price_cachemid-iteration on a slowdecide()and defeat the dedup (the Codex VIB-4843 finding). - No token stamped (direct callers / tests): a short TTL bounds reuse so prices are never served stale across loops with no iteration scope.
Builds via :meth:_build_market_snapshot; see its docstring for the
builder-contract details. Override _build_market_snapshot (not this
method) to customize how market data is populated, so the memo still
applies.
run
¶
Execute one iteration of the strategy.
This method: 1. Creates a MarketSnapshot 2. Calls decide() to get an intent or DecideResult 3. Compiles single intents to an ActionBundle 4. Returns the ActionBundle for execution
Note: For multi-intent results (list or IntentSequence), this method only compiles the first intent. Use run_multi() for full multi-intent execution with proper parallel/sequential handling.
Returns:
| Type | Description |
|---|---|
ActionBundle | None
|
ActionBundle to execute, or None if HOLD intent or no action |
run_multi
¶
Execute one iteration of the strategy, returning the full DecideResult.
Unlike run(), this method returns the full DecideResult from decide() without compiling to ActionBundle. This is useful for multi-chain execution via MultiChainOrchestrator.
Returns:
| Name | Type | Description |
|---|---|---|
DecideResult |
DecideResult
|
The raw result from decide() (may be None, single intent, |
DecideResult
|
IntentSequence, or list of intents/sequences) |
run_with_state_machine
¶
run_with_state_machine(
receipt_provider: Callable[
[ActionBundle], TransactionReceipt
]
| None = None,
) -> ExecutionResult
Execute strategy with full state machine lifecycle.
This method provides full state machine execution including: - Intent compilation - Transaction execution (via receipt_provider) - Validation - Retry logic on failure
Note: This method only handles single intents for backward compatibility. For multi-intent execution, use run_multi() with MultiChainOrchestrator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
receipt_provider
|
Callable[[ActionBundle], TransactionReceipt] | None
|
Function that executes an ActionBundle and returns a TransactionReceipt. If not provided, returns after compilation. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult with full execution details |
apply_quote_asset_override
¶
Apply a per-deployment config.json quote_asset override at boot.
Called once by the runner/CLI after construction. None keeps the
decorator default. Frozen thereafter — there is no hot-reload path that
mutates it, by design.
get_metadata
¶
Get strategy metadata if available.
Returns:
| Type | Description |
|---|---|
StrategyMetadata | None
|
StrategyMetadata if set via decorator, otherwise None |
to_dict
¶
Serialize strategy state to dictionary.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of strategy state |
on_sadflow_enter
¶
on_sadflow_enter(
error_type: str | None,
attempt: int,
context: SadflowContext,
) -> SadflowAction | None
Hook called when entering sadflow state.
Override this method to customize sadflow behavior for your strategy. This is called once when first entering sadflow, before any retry attempts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_type
|
str | None
|
Categorized error type (e.g., "INSUFFICIENT_FUNDS", "TIMEOUT", "SLIPPAGE", "REVERT"). May be None for uncategorized errors. |
required |
attempt
|
int
|
Current attempt number (1-indexed). |
required |
context
|
SadflowContext
|
SadflowContext with error details and execution state. |
required |
Returns:
| Type | Description |
|---|---|
SadflowAction | None
|
Optional[SadflowAction]: Action to take. Return None to use default |
SadflowAction | None
|
retry behavior. Return SadflowAction to customize: |
SadflowAction | None
|
|
SadflowAction | None
|
|
SadflowAction | None
|
|
SadflowAction | None
|
|
Example
def on_sadflow_enter(self, error_type, attempt, context): # Abort immediately on insufficient funds if error_type == "INSUFFICIENT_FUNDS": return SadflowAction.abort("Not enough funds for transaction")
# Increase gas for gas errors
if error_type == "GAS_ERROR" and context.action_bundle:
modified = self._increase_gas(context.action_bundle)
return SadflowAction.modify(modified, reason="Increased gas limit")
# Use default retry for other errors
return None
pause
async
¶
Pause the strategy during teardown.
Called by TeardownManager before executing teardown intents. Default is a no-op; override if your strategy needs to stop background tasks or cancel pending orders before teardown.
get_portfolio_snapshot
¶
Get current portfolio value and positions.
This method is called by the StrategyRunner after each iteration to capture portfolio snapshots for: - Dashboard value display (Total Value, PnL) - Historical PnL charts - Position breakdown by type
Default implementation: 1. Calls get_open_positions() for position values (LP, lending, perps) 2. Adds wallet token balances not captured by positions
Override for strategies needing custom value calculation (CEX, prediction).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
MarketSnapshot | None
|
Optional MarketSnapshot. If None, creates one internally. |
None
|
Returns:
| Type | Description |
|---|---|
PortfolioSnapshot
|
PortfolioSnapshot with current values and confidence level. |
PortfolioSnapshot
|
If value cannot be computed, returns snapshot with |
PortfolioSnapshot
|
value_confidence=UNAVAILABLE instead of $0. |
Example
def get_portfolio_snapshot(self, market=None) -> PortfolioSnapshot: if market is None: market = self.create_market_snapshot()
# Custom CEX balance fetch
cex_balance = self._fetch_cex_balance()
# VIB-3614: total_value_usd is positions-only; idle CEX cash is
# uninvested buying power → available_cash_usd. NAV is reconstructed
# downstream as total_value_usd + available_cash_usd, so putting
# cex_balance in BOTH would double-count it (cf. VIB-5271). Model a
# CEX holding as a PositionType.CEX position if it should count as
# deployed value instead of cash.
return PortfolioSnapshot(
timestamp=datetime.now(UTC),
deployment_id=self.deployment_id,
total_value_usd=Decimal("0"),
available_cash_usd=cex_balance,
value_confidence=ValueConfidence.ESTIMATED,
chain=self.chain,
)
get_open_positions
abstractmethod
¶
Get all open positions for this strategy.
MUST query on-chain state - do not use cached state for safety. Called during teardown preview and execution to determine what positions need to be closed.
For strategies with no positions, use StatelessStrategy as your base class, or return TeardownPositionSummary.empty(self.deployment_id).
Returns:
| Type | Description |
|---|---|
TeardownPositionSummary
|
TeardownPositionSummary with all current positions |
Example
from almanak.framework.teardown import TeardownPositionSummary, PositionInfo, PositionType
def get_open_positions(self) -> TeardownPositionSummary: positions = []
# Query on-chain LP position
lp_data = self._query_lp_position()
if lp_data:
positions.append(PositionInfo(
position_type=PositionType.LP,
position_id=lp_data["token_id"],
chain=self.chain,
protocol="uniswap_v3",
value_usd=Decimal(str(lp_data["value_usd"])),
))
return TeardownPositionSummary(
deployment_id=self.deployment_id,
timestamp=datetime.now(timezone.utc),
positions=positions,
)
generate_teardown_intents
abstractmethod
¶
generate_teardown_intents(
mode: TeardownMode, market: MarketSnapshot | None = None
) -> list[Intent]
Generate intents to close all positions.
Return intents in the correct execution order: 1. PERP - Close perpetuals first (highest liquidation risk) 2. BORROW - Repay borrowed amounts (frees collateral) 3. SUPPLY - Withdraw supplied collateral 4. LP - Close LP positions and collect fees 5. TOKEN - Swap all tokens to target token (USDC)
For strategies with no positions, use StatelessStrategy as your base class, or return an empty list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode
|
TeardownMode
|
TeardownMode.SOFT (graceful) or TeardownMode.HARD (emergency) |
required |
market
|
MarketSnapshot | None
|
Optional market snapshot with real prices. When called from the runner, this is the same snapshot used for normal decide() iterations. May be None for backward compatibility or when called outside the runner. |
None
|
Returns:
| Type | Description |
|---|---|
list[Intent]
|
List of intents to execute in order |
Example
from almanak.framework.teardown import TeardownMode
def generate_teardown_intents(self, mode: TeardownMode, market=None) -> list[Intent]: intents = []
# Get current positions
positions = self.get_open_positions()
# Use market data if available for smarter teardown
if market:
eth_price = market.price("ETH")
# Close LP position first
for pos in positions.positions_by_type(PositionType.LP):
intents.append(Intent.lp_close(
position_id=pos.position_id,
pool=pos.details.get("pool"),
collect_fees=True,
protocol="uniswap_v3",
))
# Swap remaining tokens to USDC
intents.append(Intent.swap(
from_token="WETH",
to_token="USDC",
amount=Decimal("0"), # All remaining
swap_all=True,
))
return intents
on_teardown_started
¶
Hook called when teardown starts.
Override to perform any setup before teardown begins. This is called after the cancel window expires.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode
|
TeardownMode
|
The teardown mode (SOFT or HARD) |
required |
Example
def on_teardown_started(self, mode: TeardownMode) -> None: logger.info(f"Teardown starting in {mode.value} mode") self._pause_monitoring()
on_teardown_completed
¶
Hook called when teardown completes.
Override to perform cleanup after teardown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
success
|
bool
|
Whether all positions were closed successfully |
required |
recovered_usd
|
Decimal
|
Total USD value recovered |
required |
Example
def on_teardown_completed(self, success: bool, recovered_usd: Decimal) -> None: if success: logger.info(f"Teardown complete. Recovered ${recovered_usd:,.2f}") else: logger.error("Teardown failed - manual intervention required")
get_teardown_profile
¶
Get teardown profile metadata for UX display.
Override to provide better information about teardown expectations. This helps the dashboard show more accurate previews.
Returns:
| Type | Description |
|---|---|
TeardownProfile
|
TeardownProfile with strategy-specific metadata |
Example
from almanak.framework.teardown import TeardownProfile
def get_teardown_profile(self) -> TeardownProfile: return TeardownProfile( natural_exit_assets=["WETH", "USDC"], original_entry_assets=["USDC"], recommended_target="USDC", estimated_steps=3, chains_involved=[self.chain], has_lp_positions=True, )
acknowledge_teardown_request
¶
Acknowledge a pending teardown request.
Called when the strategy picks up a teardown request and starts processing it. Local mode writes the strategy SQLite DB; hosted mode routes through the gateway.
Returns:
| Type | Description |
|---|---|
bool
|
True if request was acknowledged, False otherwise |
should_teardown
¶
Check if the strategy should enter teardown mode.
Checks for: 1. Pending teardown request (from CLI, dashboard, config) 2. Auto-protect triggers (health factor, loss limits)
Returns:
| Type | Description |
|---|---|
bool
|
True if teardown should be initiated |
on_sadflow_exit
¶
Hook called when exiting sadflow (on completion or final failure).
Override this method to perform cleanup or logging after sadflow resolution. This is called once when the intent completes (success or failure) after having been in sadflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
success
|
bool
|
Whether the intent eventually succeeded after retries. |
required |
total_attempts
|
int
|
Total number of attempts made (including the final one). |
required |
Example
def on_sadflow_exit(self, success, total_attempts): if success: logger.info(f"Recovered after {total_attempts} attempts") else: logger.error(f"Failed after {total_attempts} attempts") self.notify_operator("Intent failed after all retries")
on_retry
¶
Hook called before each retry attempt.
Override this method to customize individual retry behavior. This is called before each retry, after the initial on_sadflow_enter call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
SadflowContext
|
SadflowContext with current error details and state. |
required |
action
|
SadflowAction
|
The default SadflowAction (RETRY with calculated delay). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
SadflowAction |
SadflowAction
|
The action to take. Return the input action unchanged |
SadflowAction
|
for default behavior, or return a modified action: |
|
SadflowAction
|
|
|
SadflowAction
|
|
|
SadflowAction
|
|
|
SadflowAction
|
|
Example
def on_retry(self, context, action): # After 2 attempts, try with higher gas if context.attempt_number > 2 and context.action_bundle: modified = self._increase_gas(context.action_bundle) return SadflowAction.modify(modified)
# Abort if we've been retrying too long
if context.total_duration_seconds > 120:
return SadflowAction.abort("Retry timeout exceeded")
# Use default retry
return action
StrategyBase¶
Lower-level base class for strategies that need direct action control.
almanak.framework.strategies.StrategyBase
¶
StrategyBase(
config: ConfigT,
risk_guard_config: RiskGuardConfig | None = None,
notification_callback: NotificationCallback
| None = None,
)
Bases: ABC
Base class for all strategies with hot-reload configuration support.
This class provides: - Hot-reload configuration updates via update_config() - RiskGuard validation to prevent dangerous config changes - Atomic config application with rollback on failure - CONFIG_UPDATED event emission to timeline - Operator notification support
Strategies should inherit from this class and implement the abstract run() method.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
Hot-reloadable configuration |
|
risk_guard |
RiskGuard instance for validation |
|
persistent_state |
dict[str, Any]
|
Dict containing strategy state including config snapshots |
notification_callback |
dict[str, Any]
|
Optional callback for operator notifications |
Initialize the strategy base.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConfigT
|
Hot-reloadable configuration |
required |
risk_guard_config
|
RiskGuardConfig | None
|
Optional RiskGuard configuration |
None
|
notification_callback
|
NotificationCallback | None
|
Optional callback for sending notifications |
None
|
allocation_usd
property
¶
Strategy's declared starting allocation in USD (VIB-3882).
The runner's portfolio-baseline path uses this property to set
portfolio_metrics.initial_value_usd. When None, the runner
falls back to the legacy "first observed wallet total" heuristic
— preserved for strategies that haven't migrated to the new
contract.
The default implementation reads config.total_value_usd for
backwards compatibility (most demo / accounting strategies declare
their allocation there). Strategies that source their allocation
from elsewhere (a separate allocation.json, an env var, a
hosted control plane) should override this property.
Why a property and not a class field: the contract is on the strategy, not on the JSON file. A strategy author who wants capital isolation from a shared test-wallet can declare it independently of how their config is shaped.
run
abstractmethod
¶
Execute one iteration of the strategy.
Must be implemented by subclasses.
Returns:
| Type | Description |
|---|---|
Any
|
ActionBundle or None if no action needed |
update_config
¶
Update configuration with validation and persistence.
This method:
1. Validates new config values against the config's schema
2. Validates changes against RiskGuard (can't bypass risk limits)
3. Applies changes atomically
3a. Runs strategy-level validate_config() hook (if defined on the
subclass); on failure, rolls back to previous values so live
invariants cannot be bypassed via hot-reload or snapshot restore
4. Persists config snapshot to persistent_state
5. Emits CONFIG_UPDATED event with old and new values
6. Sends notification to operator
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates
|
dict[str, Any]
|
Dict of field -> new value to update |
required |
updated_by
|
str
|
Who is making the update (for audit trail) |
'operator'
|
Returns:
| Type | Description |
|---|---|
ConfigUpdateResult
|
ConfigUpdateResult indicating success or failure |
set_notification_callback
¶
Set the notification callback for operator alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
NotificationCallback | None
|
Callback function that takes an OperatorCard |
required |
get_config_history
¶
Get the configuration update history.
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of config snapshots in chronological order |
get_current_config_version
¶
Get the current config version number.
Returns:
| Type | Description |
|---|---|
int
|
Current config version |
get_config
¶
Get a configuration value by key, regardless of config type.
Works transparently with:
- Plain dict configs (from tests or raw JSON)
- DictConfigWrapper objects (from the CLI)
- Dataclasses and Pydantic models (attribute access)
This eliminates the per-strategy boilerplate of checking
isinstance(self.config, dict) before every config.get() call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Configuration key to retrieve |
required |
default
|
Any
|
Value to return if the key is not found |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The configuration value, or |
Example::
# In decide() -- no more boilerplate!
trade_size = self.get_config("trade_size_usd", "100")
rsi_period = self.get_config("rsi_period", 14)
restore_config_from_snapshot
¶
Restore configuration from a previous snapshot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version
|
int
|
The config version to restore to |
required |
Returns:
| Type | Description |
|---|---|
ConfigUpdateResult
|
ConfigUpdateResult indicating success or failure |
RiskGuard¶
Non-bypassable risk validation that runs before every execution.
almanak.framework.strategies.RiskGuard
¶
Validates configuration changes against risk limits.
RiskGuard ensures that configuration updates cannot bypass critical risk limits defined by the operator or system. It provides human-readable guidance when validation fails.
Initialize RiskGuard with configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RiskGuardConfig | None
|
Risk guard configuration (uses defaults if not provided) |
None
|
generate_guidance
¶
generate_guidance(
field_name: str,
requested_value: Decimal,
limit_value: Decimal,
) -> RiskGuardGuidance
Generate human-readable guidance for a failed risk check.
This method creates a RiskGuardGuidance object with: - What the limit is and what value was requested - Clear explanation of what the limit protects against - Actionable suggestion for how to proceed
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field_name
|
str
|
The configuration field that failed validation |
required |
requested_value
|
Decimal
|
The value the operator tried to set |
required |
limit_value
|
Decimal
|
The maximum/minimum allowed value |
required |
Returns:
| Type | Description |
|---|---|
RiskGuardGuidance
|
RiskGuardGuidance with human-readable explanation and suggestion |
validate_config_update
¶
validate_config_update(
current_config: HotReloadableConfig,
updates: dict[str, Any],
) -> RiskGuardResult
Validate proposed configuration updates against risk limits.
This method checks each proposed update against the risk guard's limits to ensure operators cannot accidentally configure dangerous parameters. When validation fails, it includes human-readable guidance explaining what went wrong and how to fix it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_config
|
HotReloadableConfig
|
Current configuration |
required |
updates
|
dict[str, Any]
|
Dict of field -> new value proposed updates |
required |
Returns:
| Type | Description |
|---|---|
RiskGuardResult
|
RiskGuardResult indicating if validation passed, with guidance if not |
RiskGuardConfig¶
almanak.framework.strategies.RiskGuardConfig
dataclass
¶
RiskGuardConfig(
max_slippage_limit: Decimal = Decimal("0.1"),
max_leverage_limit: Decimal = Decimal("10"),
max_daily_loss_limit_usd: Decimal = Decimal("100000"),
min_health_factor_floor: Decimal = Decimal("1.05"),
)
Configuration for RiskGuard validation.
Defines the maximum allowed values for risk-sensitive parameters. These limits cannot be exceeded through hot-reload updates.
Attributes:
| Name | Type | Description |
|---|---|---|
max_slippage_limit |
Decimal
|
Maximum allowed slippage (e.g., 0.05 = 5%) |
max_leverage_limit |
Decimal
|
Maximum allowed leverage (e.g., 10x) |
max_daily_loss_limit_usd |
Decimal
|
Maximum allowed daily loss limit |
min_health_factor_floor |
Decimal
|
Minimum allowed health factor setting |
DecideResult¶
almanak.framework.strategies.DecideResult
¶
IntentSequence¶
almanak.framework.strategies.IntentSequence
dataclass
¶
IntentSequence(
intents: list[AnyIntent],
sequence_id: str = (lambda: str(uuid.uuid4()))(),
created_at: datetime = (lambda: datetime.now(UTC))(),
description: str | None = None,
)
A sequence of intents that must execute in order (dependent actions).
IntentSequence wraps a list of intents that have dependencies between them and must execute sequentially. This is used when the output of one intent feeds into the input of the next (e.g., swap output -> bridge input).
Intents that are NOT in a sequence can execute in parallel if they are independent (e.g., two swaps on different chains).
Attributes:
| Name | Type | Description |
|---|---|---|
intents |
list[AnyIntent]
|
List of intents to execute in order |
sequence_id |
str
|
Unique identifier for this sequence |
created_at |
datetime
|
Timestamp when the sequence was created |
description |
str | None
|
Optional description of the sequence purpose |
Example
Create a sequence of dependent actions¶
sequence = Intent.sequence([ Intent.swap("USDC", "ETH", amount=Decimal("1000"), chain="base"), Intent.bridge(token="ETH", amount="all", from_chain="base", to_chain="arbitrum"), Intent.supply(protocol="aave_v3", token="WETH", amount="all", chain="arbitrum"), ])
Return from decide() - will execute sequentially¶
return sequence
deserialize
classmethod
¶
Deserialize a dictionary to an IntentSequence.
Note: This requires the Intent.deserialize function to be available, which creates a circular dependency. The actual deserialization is done in the Intent class.
ExecutionResult¶
almanak.framework.strategies.ExecutionResult
dataclass
¶
ExecutionResult(
intent: AnyIntent | None,
action_bundle: ActionBundle | None = None,
state_machine_result: StepResult | None = None,
success: bool = False,
error: str | None = None,
execution_time_ms: float = 0.0,
)
Result of strategy execution.
Attributes:
| Name | Type | Description |
|---|---|---|
intent |
AnyIntent | None
|
The intent that was executed (or None if HOLD) |
action_bundle |
ActionBundle | None
|
The compiled action bundle (or None) |
state_machine_result |
StepResult | None
|
Final state machine step result |
success |
bool
|
Whether execution was successful |
error |
str | None
|
Error message if failed |
execution_time_ms |
float
|
Time taken for execution in milliseconds |
Market Data Types¶
Data types returned by MarketSnapshot getters and accepted by set_* methods for unit testing.
TokenBalance¶
almanak.framework.market.TokenBalance
dataclass
¶
Balance information for a single token.
Comparisons delegate to the balance field (native units).
PriceData¶
almanak.framework.market.PriceData
dataclass
¶
PriceData(
price: Decimal,
price_24h_ago: Decimal = Decimal("0"),
change_24h_pct: Decimal = Decimal("0"),
high_24h: Decimal = Decimal("0"),
low_24h: Decimal = Decimal("0"),
timestamp: datetime = (lambda: datetime.now(UTC))(),
source: str = "",
)
Price data for a token.
Includes source (the named provider that produced the datum) so
accounting writers can stamp transaction_ledger.price_inputs_json with
the real provider name (VIB-3889).
RSIData¶
almanak.framework.market.RSIData
dataclass
¶
RSIData(
value: Decimal,
period: int = 14,
overbought: Decimal = Decimal("70"),
oversold: Decimal = Decimal("30"),
)
RSI (Relative Strength Index) data for a token.
MACDData¶
almanak.framework.market.MACDData
dataclass
¶
MACDData(
macd_line: Decimal,
signal_line: Decimal,
histogram: Decimal,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9,
)
MACD data for a token.
BollingerBandsData¶
almanak.framework.market.BollingerBandsData
dataclass
¶
BollingerBandsData(
upper_band: Decimal,
middle_band: Decimal,
lower_band: Decimal,
bandwidth: Decimal = Decimal("0"),
percent_b: Decimal = Decimal("0.5"),
period: int = 20,
std_dev: float = 2.0,
)
Bollinger Bands data for a token.
StochasticData¶
almanak.framework.market.StochasticData
dataclass
¶
StochasticData(
k_value: Decimal,
d_value: Decimal,
k_period: int = 14,
d_period: int = 3,
overbought: Decimal = Decimal("80"),
oversold: Decimal = Decimal("20"),
)
Stochastic Oscillator data for a token.
ATRData¶
almanak.framework.market.ATRData
dataclass
¶
ATRData(
value: Decimal,
value_percent: Decimal = Decimal("0"),
period: int = 14,
volatility_threshold: Decimal = Decimal("5.0"),
)
ATR (Average True Range) data for a token.
MAData¶
almanak.framework.market.MAData
dataclass
¶
MAData(
value: Decimal,
ma_type: str = "SMA",
period: int = 20,
current_price: Decimal = Decimal("0"),
)
Moving Average data for a token.
ADXData¶
almanak.framework.market.ADXData
dataclass
¶
ADXData(
adx: Decimal,
plus_di: Decimal,
minus_di: Decimal,
period: int = 14,
trend_threshold: Decimal = Decimal("25"),
)
ADX (Average Directional Index) data for a token.
OBVData¶
almanak.framework.market.OBVData
dataclass
¶
OBV (On-Balance Volume) data for a token.
CCIData¶
almanak.framework.market.CCIData
dataclass
¶
CCIData(
value: Decimal,
period: int = 20,
upper_level: Decimal = Decimal("100"),
lower_level: Decimal = Decimal("-100"),
)
CCI (Commodity Channel Index) data for a token.
IchimokuData¶
almanak.framework.market.IchimokuData
dataclass
¶
IchimokuData(
tenkan_sen: Decimal,
kijun_sen: Decimal,
senkou_span_a: Decimal,
senkou_span_b: Decimal,
current_price: Decimal = Decimal("0"),
tenkan_period: int = 9,
kijun_period: int = 26,
senkou_b_period: int = 52,
)
Ichimoku Cloud data for a token.
ChainHealthStatus¶
almanak.framework.strategies.ChainHealthStatus
¶
Bases: Enum
Status of a chain's data health.
Attributes:
| Name | Type | Description |
|---|---|---|
HEALTHY |
Chain data is fresh and available |
|
DEGRADED |
Chain data is stale but still usable (between threshold and 2x threshold) |
|
UNAVAILABLE |
Chain data could not be fetched |
|
STALE |
Chain data is too old to be trusted (beyond staleness threshold) |
ChainHealth¶
almanak.framework.strategies.ChainHealth
dataclass
¶
ChainHealth(
chain: str,
status: ChainHealthStatus,
last_updated: datetime | None = None,
staleness_seconds: float | None = None,
stale_threshold_seconds: float = 30.0,
error: str | None = None,
)
Health status and staleness information for a single chain.
This dataclass provides detailed information about the health of market data for a specific chain, including when data was last fetched, staleness metrics, and any error information.
Attributes:
| Name | Type | Description |
|---|---|---|
chain |
str
|
Chain name (e.g., "arbitrum", "optimism") |
status |
ChainHealthStatus
|
Current health status of the chain |
last_updated |
datetime | None
|
When the chain's data was last successfully fetched |
staleness_seconds |
float | None
|
How old the data is in seconds (None if unavailable) |
stale_threshold_seconds |
float
|
The threshold used to determine staleness |
error |
str | None
|
Error message if data fetch failed |
is_stale |
bool
|
Whether the data is considered stale |
is_available |
bool
|
Whether the data is available for use |
Example
health = ChainHealth( chain="arbitrum", status=ChainHealthStatus.HEALTHY, last_updated=datetime.now(timezone.utc), staleness_seconds=5.2, stale_threshold_seconds=30.0, )
if health.is_stale: logger.warning(f"Chain {health.chain} data is stale")
is_stale
property
¶
Check if the chain data is stale.
Returns:
| Type | Description |
|---|---|
bool
|
True if staleness exceeds threshold or data is unavailable |
is_available
property
¶
Check if the chain data is available for use.
Returns:
| Type | Description |
|---|---|
bool
|
True if data is healthy or degraded (but still usable) |
is_healthy
property
¶
Check if the chain data is fully healthy.
Returns:
| Type | Description |
|---|---|
bool
|
True if status is HEALTHY |
to_dict
¶
Convert to dictionary representation.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with health information |
from_dict
classmethod
¶
Create ChainHealth from dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
dict[str, Any]
|
Dictionary with health information |
required |
Returns:
| Type | Description |
|---|---|
ChainHealth
|
ChainHealth instance |