Strategies¶
Strategy base classes and the market snapshot interface.
IntentStrategy¶
The primary base class for writing strategies. Implement the decide() method to return an Intent.
almanak.framework.strategies.IntentStrategy
¶
IntentStrategy(
config: ConfigT,
chain: str,
wallet_address: str,
risk_guard_config: RiskGuardConfig | None = None,
notification_callback: NotificationCallback
| None = None,
compiler: IntentCompiler | None = None,
state_machine_config: StateMachineConfig | None = None,
price_oracle: PriceOracle | None = None,
rsi_provider: RSIProvider | None = None,
balance_provider: BalanceProvider | None = None,
rpc_url: str | None = None,
wallet_activity_provider: WalletActivityProvider
| None = None,
)
Bases: StrategyBase[ConfigT]
Base class for Intent-based strategies.
IntentStrategy simplifies strategy development by allowing developers to write just a decide() method that returns an Intent. The framework handles:
- Market data access via MarketSnapshot
- Intent compilation to ActionBundle
- State machine generation for execution
- Hot-reloadable configuration
- Error handling and retries
Subclasses must implement the abstract decide() method.
Example
@almanak_strategy(name="simple_strategy") class SimpleStrategy(IntentStrategy): def decide(self, market: MarketSnapshot) -> Optional[Intent]: if market.rsi("ETH").is_oversold: return Intent.swap("USDC", "ETH", amount_usd=Decimal("100")) return Intent.hold()
Attributes:
| Name | Type | Description |
|---|---|---|
compiler |
IntentCompiler
|
IntentCompiler for converting intents to action bundles |
state_machine_config |
Configuration for state machine execution |
|
_current_intent |
AnyIntent | None
|
Currently executing intent (if any) |
_current_state_machine |
IntentStateMachine | None
|
Current state machine (if any) |
Initialize the intent strategy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConfigT
|
Hot-reloadable configuration |
required |
chain
|
str
|
Chain to operate on (e.g., "arbitrum") |
required |
wallet_address
|
str
|
Wallet address for transactions |
required |
risk_guard_config
|
RiskGuardConfig | None
|
Risk guard configuration |
None
|
notification_callback
|
NotificationCallback | None
|
Callback for operator notifications |
None
|
compiler
|
IntentCompiler | None
|
Intent compiler (required for direct run() calls, optional for runner) |
None
|
state_machine_config
|
StateMachineConfig | None
|
State machine configuration |
None
|
price_oracle
|
PriceOracle | None
|
Function to fetch prices |
None
|
rsi_provider
|
RSIProvider | None
|
Function to calculate RSI |
None
|
balance_provider
|
BalanceProvider | None
|
Function to fetch balances |
None
|
rpc_url
|
str | None
|
RPC URL for on-chain queries (needed for LP close) |
None
|
wallet_activity_provider
|
WalletActivityProvider | None
|
Provider for leader wallet activity signals |
None
|
compiler
property
writable
¶
Get the intent compiler.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If compiler was not provided and is accessed directly. The StrategyRunner creates its own compiler with real prices, so this is only needed for direct run() calls. |
current_state_machine
property
¶
Get the current state machine.
set_state_manager
¶
Set the state manager for persistence.
Called by the runner to inject the state manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state_manager
|
Any
|
StateManager instance |
required |
strategy_id
|
str
|
Unique ID for this strategy instance |
required |
get_persistent_state
¶
Get strategy state to persist.
Override this method to define what state should be persisted. Default implementation returns empty dict (no state).
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict of state key-value pairs to persist |
load_persistent_state
¶
Load persisted state into the strategy.
Override this method to restore state from persistence. Default implementation does nothing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
state
|
dict[str, Any]
|
Dict of state key-value pairs loaded from storage |
required |
save_state
¶
Save current strategy state to persistence.
Called by runner after each iteration.
flush_pending_saves
async
¶
Wait for any pending save operations to complete.
This should be called before disconnecting from the gateway to ensure all state saves have completed. Handles both successful completion and errors gracefully.
load_state
¶
Load strategy state from persistence.
Called by runner on startup.
Returns:
| Type | Description |
|---|---|
bool
|
True if state was found and loaded, False otherwise |
decide
abstractmethod
¶
Decide what action to take based on current market conditions.
This is the main method that strategy developers need to implement. It receives a MarketSnapshot with current market data and should return an Intent, IntentSequence, list of intents, or None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
MarketSnapshot
|
Current market snapshot with prices, balances, RSI, etc. |
required |
Returns:
| Type | Description |
|---|---|
DecideResult
|
One of: |
DecideResult
|
|
DecideResult
|
|
DecideResult
|
|
DecideResult
|
|
DecideResult
|
Returning None is equivalent to returning Intent.hold(). |
Example
def decide(self, market: MarketSnapshot) -> DecideResult: # Single intent if market.rsi("ETH").is_oversold: return Intent.swap("USDC", "ETH", amount_usd=Decimal("1000"))
# Sequence of dependent actions (execute in order)
if should_move_funds:
return Intent.sequence([
Intent.swap("USDC", "ETH", amount=Decimal("1000"), chain="base"),
Intent.supply(protocol="aave_v3", token="WETH", amount=Decimal("0.5"), chain="arbitrum"),
])
# Multiple independent actions (execute in parallel)
if should_rebalance:
return [
Intent.swap("USDC", "ETH", amount=Decimal("500"), chain="arbitrum"),
Intent.swap("USDC", "ETH", amount=Decimal("500"), chain="optimism"),
]
# No action
return Intent.hold(reason="RSI in neutral zone")
on_intent_executed
¶
Called after each intent execution completes.
Override this method to react to execution results, e.g., to track position IDs, log swap amounts, or update state based on results.
The result object is enriched by the framework with extracted data that "just appears" based on intent type: - SWAP: result.swap_amounts (SwapAmounts) - LP_OPEN: result.position_id, result.extracted_data["liquidity"] - LP_CLOSE: result.lp_close_data (LPCloseData) - PERP_OPEN: result.extracted_data["entry_price"], ["leverage"]
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
intent
|
Any
|
The intent that was executed |
required |
success
|
bool
|
Whether execution succeeded |
required |
result
|
Any
|
ExecutionResult with enriched data |
required |
set_multi_chain_providers
¶
set_multi_chain_providers(
price_oracle: MultiChainPriceOracle | None = None,
balance_provider: MultiChainBalanceProvider
| None = None,
aave_health_factor_provider: AaveHealthFactorProvider
| None = None,
) -> None
Set multi-chain data providers for cross-chain strategies.
Call this method before running a multi-chain strategy to enable MultiChainMarketSnapshot creation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
price_oracle
|
MultiChainPriceOracle | None
|
Multi-chain price oracle |
None
|
balance_provider
|
MultiChainBalanceProvider | None
|
Multi-chain balance provider |
None
|
aave_health_factor_provider
|
AaveHealthFactorProvider | None
|
Aave health factor provider |
None
|
is_multi_chain
¶
Check if this strategy is multi-chain.
Returns:
| Type | Description |
|---|---|
bool
|
True if SUPPORTED_CHAINS has multiple chains |
get_supported_chains
¶
Get the chains supported by this strategy.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of supported chain names |
create_market_snapshot
¶
Create a market snapshot for the current iteration.
Automatically creates MultiChainMarketSnapshot for multi-chain strategies if multi-chain providers have been set. Otherwise returns single-chain MarketSnapshot.
Override this method to customize how market data is populated.
Returns:
| Type | Description |
|---|---|
MarketSnapshot
|
MarketSnapshot (or MultiChainMarketSnapshot for multi-chain strategies) |
run
¶
Execute one iteration of the strategy.
This method: 1. Creates a MarketSnapshot 2. Calls decide() to get an intent or DecideResult 3. Compiles single intents to an ActionBundle 4. Returns the ActionBundle for execution
Note: For multi-intent results (list or IntentSequence), this method only compiles the first intent. Use run_multi() for full multi-intent execution with proper parallel/sequential handling.
Returns:
| Type | Description |
|---|---|
ActionBundle | None
|
ActionBundle to execute, or None if HOLD intent or no action |
run_multi
¶
Execute one iteration of the strategy, returning the full DecideResult.
Unlike run(), this method returns the full DecideResult from decide() without compiling to ActionBundle. This is useful for multi-chain execution via MultiChainOrchestrator.
Returns:
| Name | Type | Description |
|---|---|---|
DecideResult |
DecideResult
|
The raw result from decide() (may be None, single intent, |
DecideResult
|
IntentSequence, or list of intents/sequences) |
run_with_state_machine
¶
run_with_state_machine(
receipt_provider: Callable[
[ActionBundle], TransactionReceipt
]
| None = None,
) -> ExecutionResult
Execute strategy with full state machine lifecycle.
This method provides full state machine execution including: - Intent compilation - Transaction execution (via receipt_provider) - Validation - Retry logic on failure
Note: This method only handles single intents for backward compatibility. For multi-intent execution, use run_multi() with MultiChainOrchestrator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
receipt_provider
|
Callable[[ActionBundle], TransactionReceipt] | None
|
Function that executes an ActionBundle and returns a TransactionReceipt. If not provided, returns after compilation. |
None
|
Returns:
| Type | Description |
|---|---|
ExecutionResult
|
ExecutionResult with full execution details |
get_metadata
¶
Get strategy metadata if available.
Returns:
| Type | Description |
|---|---|
StrategyMetadata | None
|
StrategyMetadata if set via decorator, otherwise None |
to_dict
¶
Serialize strategy state to dictionary.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary representation of strategy state |
on_sadflow_enter
¶
on_sadflow_enter(
error_type: str | None,
attempt: int,
context: SadflowContext,
) -> SadflowAction | None
Hook called when entering sadflow state.
Override this method to customize sadflow behavior for your strategy. This is called once when first entering sadflow, before any retry attempts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error_type
|
str | None
|
Categorized error type (e.g., "INSUFFICIENT_FUNDS", "TIMEOUT", "SLIPPAGE", "REVERT"). May be None for uncategorized errors. |
required |
attempt
|
int
|
Current attempt number (1-indexed). |
required |
context
|
SadflowContext
|
SadflowContext with error details and execution state. |
required |
Returns:
| Type | Description |
|---|---|
SadflowAction | None
|
Optional[SadflowAction]: Action to take. Return None to use default |
SadflowAction | None
|
retry behavior. Return SadflowAction to customize: |
SadflowAction | None
|
|
SadflowAction | None
|
|
SadflowAction | None
|
|
SadflowAction | None
|
|
Example
def on_sadflow_enter(self, error_type, attempt, context): # Abort immediately on insufficient funds if error_type == "INSUFFICIENT_FUNDS": return SadflowAction.abort("Not enough funds for transaction")
# Increase gas for gas errors
if error_type == "GAS_ERROR" and context.action_bundle:
modified = self._increase_gas(context.action_bundle)
return SadflowAction.modify(modified, reason="Increased gas limit")
# Use default retry for other errors
return None
supports_teardown
¶
Check if this strategy supports the teardown system.
Override to return True when your strategy implements: - get_open_positions() - generate_teardown_intents()
Returns:
| Type | Description |
|---|---|
bool
|
True if teardown is supported, False otherwise |
Example
def supports_teardown(self) -> bool: return True # Enable teardown for this strategy
get_portfolio_snapshot
¶
Get current portfolio value and positions.
This method is called by the StrategyRunner after each iteration to capture portfolio snapshots for: - Dashboard value display (Total Value, PnL) - Historical PnL charts - Position breakdown by type
Default implementation: 1. Calls get_open_positions() for position values (LP, lending, perps) 2. Adds wallet token balances not captured by positions
Override for strategies needing custom value calculation (CEX, prediction).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
MarketSnapshot | None
|
Optional MarketSnapshot. If None, creates one internally. |
None
|
Returns:
| Type | Description |
|---|---|
PortfolioSnapshot
|
PortfolioSnapshot with current values and confidence level. |
PortfolioSnapshot
|
If value cannot be computed, returns snapshot with |
PortfolioSnapshot
|
value_confidence=UNAVAILABLE instead of $0. |
Example
def get_portfolio_snapshot(self, market=None) -> PortfolioSnapshot: if market is None: market = self.create_market_snapshot()
# Custom CEX balance fetch
cex_balance = self._fetch_cex_balance()
return PortfolioSnapshot(
timestamp=datetime.now(UTC),
strategy_id=self.strategy_id,
total_value_usd=cex_balance,
available_cash_usd=cex_balance,
value_confidence=ValueConfidence.ESTIMATED,
chain=self.chain,
)
get_open_positions
¶
Get all open positions for this strategy.
MUST query on-chain state - do not use cached state for safety. Called during teardown preview and execution to determine what positions need to be closed.
Returns:
| Type | Description |
|---|---|
TeardownPositionSummary
|
TeardownPositionSummary with all current positions |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If teardown is not implemented |
Example
from almanak.framework.teardown import TeardownPositionSummary, PositionInfo, PositionType
def get_open_positions(self) -> TeardownPositionSummary: positions = []
# Query on-chain LP position
lp_data = self._query_lp_position()
if lp_data:
positions.append(PositionInfo(
position_type=PositionType.LP,
position_id=lp_data["token_id"],
chain=self.chain,
protocol="uniswap_v3",
value_usd=Decimal(str(lp_data["value_usd"])),
))
return TeardownPositionSummary(
strategy_id=self.STRATEGY_NAME,
timestamp=datetime.now(timezone.utc),
positions=positions,
)
generate_teardown_intents
¶
Generate intents to close all positions.
Return intents in the correct execution order: 1. PERP - Close perpetuals first (highest liquidation risk) 2. BORROW - Repay borrowed amounts (frees collateral) 3. SUPPLY - Withdraw supplied collateral 4. LP - Close LP positions and collect fees 5. TOKEN - Swap all tokens to target token (USDC)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode
|
TeardownMode
|
TeardownMode.SOFT (graceful) or TeardownMode.HARD (emergency) |
required |
Returns:
| Type | Description |
|---|---|
list[Intent]
|
List of intents to execute in order |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If teardown is not implemented |
Example
from almanak.framework.teardown import TeardownMode
def generate_teardown_intents(self, mode: TeardownMode) -> list[Intent]: intents = []
# Get current positions
positions = self.get_open_positions()
# Close LP position first
for pos in positions.positions_by_type(PositionType.LP):
intents.append(Intent.lp_close(
position_id=pos.position_id,
pool=pos.details.get("pool"),
collect_fees=True,
protocol="uniswap_v3",
))
# Swap remaining tokens to USDC
intents.append(Intent.swap(
from_token="WETH",
to_token="USDC",
amount=Decimal("0"), # All remaining
swap_all=True,
))
return intents
on_teardown_started
¶
Hook called when teardown starts.
Override to perform any setup before teardown begins. This is called after the cancel window expires.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mode
|
TeardownMode
|
The teardown mode (SOFT or HARD) |
required |
Example
def on_teardown_started(self, mode: TeardownMode) -> None: logger.info(f"Teardown starting in {mode.value} mode") self._pause_monitoring()
on_teardown_completed
¶
Hook called when teardown completes.
Override to perform cleanup after teardown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
success
|
bool
|
Whether all positions were closed successfully |
required |
recovered_usd
|
Decimal
|
Total USD value recovered |
required |
Example
def on_teardown_completed(self, success: bool, recovered_usd: Decimal) -> None: if success: logger.info(f"Teardown complete. Recovered ${recovered_usd:,.2f}") else: logger.error("Teardown failed - manual intervention required")
get_teardown_profile
¶
Get teardown profile metadata for UX display.
Override to provide better information about teardown expectations. This helps the dashboard show more accurate previews.
Returns:
| Type | Description |
|---|---|
TeardownProfile
|
TeardownProfile with strategy-specific metadata |
Example
from almanak.framework.teardown import TeardownProfile
def get_teardown_profile(self) -> TeardownProfile: return TeardownProfile( natural_exit_assets=["WETH", "USDC"], original_entry_assets=["USDC"], recommended_target="USDC", estimated_steps=3, chains_involved=[self.chain], has_lp_positions=True, )
acknowledge_teardown_request
¶
Acknowledge a pending teardown request.
Called when the strategy picks up a teardown request and starts processing it.
Returns:
| Type | Description |
|---|---|
bool
|
True if request was acknowledged, False otherwise |
should_teardown
¶
Check if the strategy should enter teardown mode.
Checks for: 1. Pending teardown request (from CLI, dashboard, config) 2. Auto-protect triggers (health factor, loss limits)
Returns:
| Type | Description |
|---|---|
bool
|
True if teardown should be initiated |
on_sadflow_exit
¶
Hook called when exiting sadflow (on completion or final failure).
Override this method to perform cleanup or logging after sadflow resolution. This is called once when the intent completes (success or failure) after having been in sadflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
success
|
bool
|
Whether the intent eventually succeeded after retries. |
required |
total_attempts
|
int
|
Total number of attempts made (including the final one). |
required |
Example
def on_sadflow_exit(self, success, total_attempts): if success: logger.info(f"Recovered after {total_attempts} attempts") else: logger.error(f"Failed after {total_attempts} attempts") self.notify_operator("Intent failed after all retries")
on_retry
¶
Hook called before each retry attempt.
Override this method to customize individual retry behavior. This is called before each retry, after the initial on_sadflow_enter call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
SadflowContext
|
SadflowContext with current error details and state. |
required |
action
|
SadflowAction
|
The default SadflowAction (RETRY with calculated delay). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
SadflowAction |
SadflowAction
|
The action to take. Return the input action unchanged |
SadflowAction
|
for default behavior, or return a modified action: |
|
SadflowAction
|
|
|
SadflowAction
|
|
|
SadflowAction
|
|
|
SadflowAction
|
|
Example
def on_retry(self, context, action): # After 2 attempts, try with higher gas if context.attempt_number > 2 and context.action_bundle: modified = self._increase_gas(context.action_bundle) return SadflowAction.modify(modified)
# Abort if we've been retrying too long
if context.total_duration_seconds > 120:
return SadflowAction.abort("Retry timeout exceeded")
# Use default retry
return action
StrategyBase¶
Lower-level base class for strategies that need direct action control.
almanak.framework.strategies.StrategyBase
¶
StrategyBase(
config: ConfigT,
risk_guard_config: RiskGuardConfig | None = None,
notification_callback: NotificationCallback
| None = None,
)
Bases: ABC
Base class for all strategies with hot-reload configuration support.
This class provides: - Hot-reload configuration updates via update_config() - RiskGuard validation to prevent dangerous config changes - Atomic config application with rollback on failure - CONFIG_UPDATED event emission to timeline - Operator notification support
Strategies should inherit from this class and implement the abstract run() method.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
Hot-reloadable configuration |
|
risk_guard |
RiskGuard instance for validation |
|
persistent_state |
dict[str, Any]
|
Dict containing strategy state including config snapshots |
notification_callback |
dict[str, Any]
|
Optional callback for operator notifications |
Initialize the strategy base.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConfigT
|
Hot-reloadable configuration |
required |
risk_guard_config
|
RiskGuardConfig | None
|
Optional RiskGuard configuration |
None
|
notification_callback
|
NotificationCallback | None
|
Optional callback for sending notifications |
None
|
run
abstractmethod
¶
Execute one iteration of the strategy.
Must be implemented by subclasses.
Returns:
| Type | Description |
|---|---|
Any
|
ActionBundle or None if no action needed |
update_config
¶
Update configuration with validation and persistence.
This method: 1. Validates new config values against the config's schema 2. Validates changes against RiskGuard (can't bypass risk limits) 3. Applies changes atomically 4. Persists config snapshot to persistent_state 5. Emits CONFIG_UPDATED event with old and new values 6. Sends notification to operator
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
updates
|
dict[str, Any]
|
Dict of field -> new value to update |
required |
updated_by
|
str
|
Who is making the update (for audit trail) |
'operator'
|
Returns:
| Type | Description |
|---|---|
ConfigUpdateResult
|
ConfigUpdateResult indicating success or failure |
set_notification_callback
¶
Set the notification callback for operator alerts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
NotificationCallback | None
|
Callback function that takes an OperatorCard |
required |
get_config_history
¶
Get the configuration update history.
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
List of config snapshots in chronological order |
get_current_config_version
¶
Get the current config version number.
Returns:
| Type | Description |
|---|---|
int
|
Current config version |
restore_config_from_snapshot
¶
Restore configuration from a previous snapshot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
version
|
int
|
The config version to restore to |
required |
Returns:
| Type | Description |
|---|---|
ConfigUpdateResult
|
ConfigUpdateResult indicating success or failure |
MarketSnapshot¶
Unified interface for accessing market data within decide().
almanak.framework.strategies.MarketSnapshot
¶
MarketSnapshot(
chain: str,
wallet_address: str,
price_oracle: PriceOracle | None = None,
rsi_provider: RSIProvider | None = None,
balance_provider: BalanceProvider | None = None,
timestamp: datetime | None = None,
wallet_activity_provider: WalletActivityProvider
| None = None,
)
Helper class providing market data access for strategy decisions.
MarketSnapshot provides a simple interface for strategies to access: - Token prices - RSI values - Wallet balances - Position information
The snapshot is populated with data at the start of each iteration, allowing strategies to make decisions based on current market conditions.
Example
def decide(self, market: MarketSnapshot) -> Optional[Intent]: # Get ETH price eth_price = market.price("ETH")
# Get RSI
rsi = market.rsi("ETH", period=14)
if rsi.is_oversold:
return Intent.swap("USDC", "ETH", amount_usd=Decimal("1000"))
# Check balance
balance = market.balance("USDC")
if balance.balance_usd < Decimal("100"):
return Intent.hold(reason="Insufficient balance")
return Intent.hold()
Initialize market snapshot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str
|
Chain name (e.g., "arbitrum", "ethereum") |
required |
wallet_address
|
str
|
Wallet address for balance queries |
required |
price_oracle
|
PriceOracle | None
|
Function to fetch prices (token, quote) -> price |
None
|
rsi_provider
|
RSIProvider | None
|
Function to calculate RSI (token, period) -> RSIData |
None
|
balance_provider
|
BalanceProvider | None
|
Function to fetch balances (token) -> TokenBalance |
None
|
timestamp
|
datetime | None
|
Snapshot timestamp (defaults to now) |
None
|
wallet_activity_provider
|
WalletActivityProvider | None
|
Provider for leader wallet activity signals |
None
|
price
¶
Get the price of a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "ETH", "WBTC") |
required |
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
Returns:
| Type | Description |
|---|---|
Decimal
|
Token price in quote currency |
Raises:
| Type | Description |
|---|---|
ValueError
|
If price cannot be determined |
price_data
¶
Get full price data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
Returns:
| Type | Description |
|---|---|
PriceData
|
PriceData with current price and historical data |
rsi
¶
Get RSI (Relative Strength Index) for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
RSI calculation period (default 14) |
14
|
Returns:
| Type | Description |
|---|---|
RSIData
|
RSIData with current RSI value and signal |
Raises:
| Type | Description |
|---|---|
ValueError
|
If RSI cannot be calculated |
macd
¶
macd(
token: str,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9,
) -> MACDData
Get MACD (Moving Average Convergence Divergence) for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
fast_period
|
int
|
Fast EMA period (default 12) |
12
|
slow_period
|
int
|
Slow EMA period (default 26) |
26
|
signal_period
|
int
|
Signal EMA period (default 9) |
9
|
Returns:
| Type | Description |
|---|---|
MACDData
|
MACDData with MACD line, signal line, and histogram |
Raises:
| Type | Description |
|---|---|
ValueError
|
If MACD data is not available |
Example
macd = market.macd("WETH") if macd.is_bullish_crossover: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
bollinger_bands
¶
Get Bollinger Bands for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
SMA period (default 20) |
20
|
std_dev
|
float
|
Standard deviation multiplier (default 2.0) |
2.0
|
Returns:
| Type | Description |
|---|---|
BollingerBandsData
|
BollingerBandsData with upper, middle, lower bands and position metrics |
Raises:
| Type | Description |
|---|---|
ValueError
|
If Bollinger Bands data is not available |
Example
bb = market.bollinger_bands("WETH") if bb.is_oversold: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
stochastic
¶
Get Stochastic Oscillator for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
k_period
|
int
|
%K period (default 14) |
14
|
d_period
|
int
|
%D period (default 3) |
3
|
Returns:
| Type | Description |
|---|---|
StochasticData
|
StochasticData with %K and %D values |
Raises:
| Type | Description |
|---|---|
ValueError
|
If Stochastic data is not available |
Example
stoch = market.stochastic("WETH") if stoch.is_oversold and stoch.k_value > stoch.d_value: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
atr
¶
Get ATR (Average True Range) for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
ATR period (default 14) |
14
|
Returns:
| Type | Description |
|---|---|
ATRData
|
ATRData with ATR value and volatility assessment |
Raises:
| Type | Description |
|---|---|
ValueError
|
If ATR data is not available |
Example
atr = market.atr("WETH") if atr.is_low_volatility: # Safe to trade return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
sma
¶
Get Simple Moving Average for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
SMA period (default 20) |
20
|
Returns:
| Type | Description |
|---|---|
MAData
|
MAData with SMA value |
Raises:
| Type | Description |
|---|---|
ValueError
|
If SMA data is not available |
Example
sma = market.sma("WETH", period=50) if sma.is_price_above: print("Bullish - price above 50 SMA")
ema
¶
Get Exponential Moving Average for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
EMA period (default 12) |
12
|
Returns:
| Type | Description |
|---|---|
MAData
|
MAData with EMA value |
Raises:
| Type | Description |
|---|---|
ValueError
|
If EMA data is not available |
Example
ema_12 = market.ema("WETH", period=12) ema_26 = market.ema("WETH", period=26) if ema_12.value > ema_26.value: print("Golden cross - bullish")
adx
¶
Get ADX (Average Directional Index) for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
ADX period (default 14) |
14
|
Returns:
| Type | Description |
|---|---|
ADXData
|
ADXData with ADX, +DI, and -DI values |
Raises:
| Type | Description |
|---|---|
ValueError
|
If ADX data is not available |
Example
adx = market.adx("WETH") if adx.is_uptrend: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
obv
¶
Get OBV (On-Balance Volume) for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
Returns:
| Type | Description |
|---|---|
OBVData
|
OBVData with OBV and signal line values |
Raises:
| Type | Description |
|---|---|
ValueError
|
If OBV data is not available |
Example
obv = market.obv("WETH") if obv.is_bullish: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
cci
¶
Get CCI (Commodity Channel Index) for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
period
|
int
|
CCI period (default 20) |
20
|
Returns:
| Type | Description |
|---|---|
CCIData
|
CCIData with CCI value and overbought/oversold status |
Raises:
| Type | Description |
|---|---|
ValueError
|
If CCI data is not available |
Example
cci = market.cci("WETH") if cci.is_oversold: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
ichimoku
¶
Get Ichimoku Cloud data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
Returns:
| Type | Description |
|---|---|
IchimokuData
|
IchimokuData with all Ichimoku components |
Raises:
| Type | Description |
|---|---|
ValueError
|
If Ichimoku data is not available |
Example
ich = market.ichimoku("WETH") if ich.is_bullish_crossover and ich.is_above_cloud: return Intent.swap("USDC", "WETH", amount_usd=Decimal("100"))
balance
¶
Get wallet balance for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
Returns:
| Type | Description |
|---|---|
TokenBalance
|
TokenBalance with current balance |
Raises:
| Type | Description |
|---|---|
ValueError
|
If balance cannot be determined |
balance_usd
¶
Get wallet balance in USD terms.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Balance in USD |
set_price
¶
Pre-populate price for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
price_value
|
Decimal
|
Price value in USD |
required |
set_balance
¶
Pre-populate balance for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
balance_data
|
TokenBalance
|
Balance data |
required |
set_rsi
¶
Pre-populate RSI for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
rsi_data
|
RSIData
|
RSI data |
required |
set_macd
¶
Pre-populate MACD data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
macd_data
|
MACDData
|
MACDData instance |
required |
Example
market.set_macd("WETH", MACDData( macd_line=Decimal("0.5"), signal_line=Decimal("0.3"), histogram=Decimal("0.2"), ))
set_bollinger_bands
¶
Pre-populate Bollinger Bands data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
bb_data
|
BollingerBandsData
|
BollingerBandsData instance |
required |
Example
market.set_bollinger_bands("WETH", BollingerBandsData( upper_band=Decimal("3100"), middle_band=Decimal("3000"), lower_band=Decimal("2900"), percent_b=Decimal("0.5"), ))
set_stochastic
¶
Pre-populate Stochastic data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
stoch_data
|
StochasticData
|
StochasticData instance |
required |
Example
market.set_stochastic("WETH", StochasticData( k_value=Decimal("25"), d_value=Decimal("30"), ))
set_atr
¶
Pre-populate ATR data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
atr_data
|
ATRData
|
ATRData instance |
required |
Example
market.set_atr("WETH", ATRData( value=Decimal("50"), value_percent=Decimal("2.5"), ))
set_ma
¶
Pre-populate Moving Average data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
ma_data
|
MAData
|
MAData instance |
required |
ma_type
|
str
|
Type of MA ("SMA" or "EMA") |
'SMA'
|
period
|
int
|
MA period |
20
|
Example
market.set_ma("WETH", MAData( value=Decimal("3000"), ma_type="SMA", period=20, current_price=Decimal("3050"), ), ma_type="SMA", period=20)
set_adx
¶
Pre-populate ADX data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
adx_data
|
ADXData
|
ADXData instance |
required |
Example
market.set_adx("WETH", ADXData( adx=Decimal("30"), plus_di=Decimal("25"), minus_di=Decimal("15"), ))
set_obv
¶
Pre-populate OBV data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
obv_data
|
OBVData
|
OBVData instance |
required |
Example
market.set_obv("WETH", OBVData( obv=Decimal("1000000"), signal_line=Decimal("950000"), ))
set_cci
¶
Pre-populate CCI data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
cci_data
|
CCIData
|
CCIData instance |
required |
Example
market.set_cci("WETH", CCIData( value=Decimal("-120"), ))
set_ichimoku
¶
Pre-populate Ichimoku data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
ichimoku_data
|
IchimokuData
|
IchimokuData instance |
required |
Example
market.set_ichimoku("WETH", IchimokuData( tenkan_sen=Decimal("3050"), kijun_sen=Decimal("3000"), senkou_span_a=Decimal("3025"), senkou_span_b=Decimal("2950"), current_price=Decimal("3100"), ))
wallet_activity
¶
wallet_activity(
leader_address: str | None = None,
action_types: list[str] | None = None,
min_usd_value: Decimal | None = None,
protocols: list[str] | None = None,
) -> list
Get leader wallet activity signals for copy trading.
Returns filtered signals from the WalletActivityProvider. If no provider is configured, returns an empty list (graceful degradation).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
leader_address
|
str | None
|
Filter by specific leader wallet address |
None
|
action_types
|
list[str] | None
|
Filter by action types (e.g., ["SWAP"]) |
None
|
min_usd_value
|
Decimal | None
|
Minimum USD value filter |
None
|
protocols
|
list[str] | None
|
Filter by protocol names (e.g., ["uniswap_v3"]) |
None
|
Returns:
| Type | Description |
|---|---|
list
|
List of CopySignal objects matching the filters |
get_price_oracle_dict
¶
Get all prices as a dict suitable for IntentCompiler.
Combines pre-populated prices and cached prices from oracle calls. This is used to pass real prices to the IntentCompiler for accurate slippage calculations.
Returns:
| Type | Description |
|---|---|
dict[str, Decimal]
|
Dict mapping token symbols to USD prices |
RiskGuard¶
Non-bypassable risk validation that runs before every execution.
almanak.framework.strategies.RiskGuard
¶
Validates configuration changes against risk limits.
RiskGuard ensures that configuration updates cannot bypass critical risk limits defined by the operator or system. It provides human-readable guidance when validation fails.
Initialize RiskGuard with configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
RiskGuardConfig | None
|
Risk guard configuration (uses defaults if not provided) |
None
|
generate_guidance
¶
generate_guidance(
field_name: str,
requested_value: Decimal,
limit_value: Decimal,
) -> RiskGuardGuidance
Generate human-readable guidance for a failed risk check.
This method creates a RiskGuardGuidance object with: - What the limit is and what value was requested - Clear explanation of what the limit protects against - Actionable suggestion for how to proceed
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
field_name
|
str
|
The configuration field that failed validation |
required |
requested_value
|
Decimal
|
The value the operator tried to set |
required |
limit_value
|
Decimal
|
The maximum/minimum allowed value |
required |
Returns:
| Type | Description |
|---|---|
RiskGuardGuidance
|
RiskGuardGuidance with human-readable explanation and suggestion |
validate_config_update
¶
validate_config_update(
current_config: HotReloadableConfig,
updates: dict[str, Any],
) -> RiskGuardResult
Validate proposed configuration updates against risk limits.
This method checks each proposed update against the risk guard's limits to ensure operators cannot accidentally configure dangerous parameters. When validation fails, it includes human-readable guidance explaining what went wrong and how to fix it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
current_config
|
HotReloadableConfig
|
Current configuration |
required |
updates
|
dict[str, Any]
|
Dict of field -> new value proposed updates |
required |
Returns:
| Type | Description |
|---|---|
RiskGuardResult
|
RiskGuardResult indicating if validation passed, with guidance if not |
RiskGuardConfig¶
almanak.framework.strategies.RiskGuardConfig
dataclass
¶
RiskGuardConfig(
max_slippage_limit: Decimal = Decimal("0.1"),
max_leverage_limit: Decimal = Decimal("10"),
max_daily_loss_limit_usd: Decimal = Decimal("100000"),
min_health_factor_floor: Decimal = Decimal("1.05"),
)
Configuration for RiskGuard validation.
Defines the maximum allowed values for risk-sensitive parameters. These limits cannot be exceeded through hot-reload updates.
Attributes:
| Name | Type | Description |
|---|---|---|
max_slippage_limit |
Decimal
|
Maximum allowed slippage (e.g., 0.05 = 5%) |
max_leverage_limit |
Decimal
|
Maximum allowed leverage (e.g., 10x) |
max_daily_loss_limit_usd |
Decimal
|
Maximum allowed daily loss limit |
min_health_factor_floor |
Decimal
|
Minimum allowed health factor setting |
DecideResult¶
almanak.framework.strategies.DecideResult
module-attribute
¶
IntentSequence¶
almanak.framework.strategies.IntentSequence
dataclass
¶
IntentSequence(
intents: list[AnyIntent],
sequence_id: str = (lambda: str(uuid.uuid4()))(),
created_at: datetime = (lambda: datetime.now(UTC))(),
description: str | None = None,
)
A sequence of intents that must execute in order (dependent actions).
IntentSequence wraps a list of intents that have dependencies between them and must execute sequentially. This is used when the output of one intent feeds into the input of the next (e.g., swap output -> bridge input).
Intents that are NOT in a sequence can execute in parallel if they are independent (e.g., two swaps on different chains).
Attributes:
| Name | Type | Description |
|---|---|---|
intents |
list[AnyIntent]
|
List of intents to execute in order |
sequence_id |
str
|
Unique identifier for this sequence |
created_at |
datetime
|
Timestamp when the sequence was created |
description |
str | None
|
Optional description of the sequence purpose |
Example
Create a sequence of dependent actions¶
sequence = Intent.sequence([ Intent.swap("USDC", "ETH", amount=Decimal("1000"), chain="base"), Intent.bridge(token="ETH", amount="all", from_chain="base", to_chain="arbitrum"), Intent.supply(protocol="aave_v3", token="WETH", amount="all", chain="arbitrum"), ])
Return from decide() - will execute sequentially¶
return sequence
deserialize
classmethod
¶
Deserialize a dictionary to an IntentSequence.
Note: This requires the Intent.deserialize function to be available, which creates a circular dependency. The actual deserialization is done in the Intent class.
ExecutionResult¶
almanak.framework.strategies.ExecutionResult
dataclass
¶
ExecutionResult(
intent: AnyIntent | None,
action_bundle: ActionBundle | None = None,
state_machine_result: StepResult | None = None,
success: bool = False,
error: str | None = None,
execution_time_ms: float = 0.0,
)
Result of strategy execution.
Attributes:
| Name | Type | Description |
|---|---|---|
intent |
AnyIntent | None
|
The intent that was executed (or None if HOLD) |
action_bundle |
ActionBundle | None
|
The compiled action bundle (or None) |
state_machine_result |
StepResult | None
|
Final state machine step result |
success |
bool
|
Whether execution was successful |
error |
str | None
|
Error message if failed |
execution_time_ms |
float
|
Time taken for execution in milliseconds |