Skip to content

Data Layer

Market data providers, price oracles, balance providers, and the unified MarketSnapshot interface.

MarketSnapshot

The primary data interface passed to decide(). Provides lazy access to prices, balances, indicators, and more.

almanak.framework.data.MarketSnapshot

MarketSnapshot(
    chain: str,
    wallet_address: str,
    price_oracle: PriceOracle | None = None,
    balance_provider: BalanceProvider | None = None,
    rsi_calculator: RSICalculator | None = None,
    ohlcv_module: Optional[OHLCVModule] = None,
    gas_oracle: Optional[GasOracle] = None,
    pool_reader: Optional[UniswapV3PoolReader] = None,
    rate_monitor: Optional[RateMonitor] = None,
    funding_rate_provider: Optional[
        FundingRateProvider
    ] = None,
    multi_dex_service: Optional[
        MultiDexPriceService
    ] = None,
    il_calculator: Optional[ILCalculator] = None,
    prediction_provider: Optional[
        PredictionMarketDataProvider
    ] = None,
    stablecoin_config: StablecoinConfig | None = None,
    freshness_config: FreshnessConfig | None = None,
    timestamp: datetime | None = None,
    pool_reader_registry: Optional[
        PoolReaderRegistry
    ] = None,
    price_aggregator: Optional[PriceAggregator] = None,
    data_router: Optional[DataRouter] = None,
    ohlcv_router: Optional[OHLCVRouter] = None,
    pool_history_reader: Optional[PoolHistoryReader] = None,
    rate_history_reader: Optional[RateHistoryReader] = None,
    liquidity_depth_reader: Optional[
        LiquidityDepthReader
    ] = None,
    slippage_estimator: Optional[SlippageEstimator] = None,
    volatility_calculator: Optional[
        RealizedVolatilityCalculator
    ] = None,
    risk_calculator: Optional[
        PortfolioRiskCalculator
    ] = None,
    pool_analytics_reader: Optional[
        PoolAnalyticsReader
    ] = None,
    yield_aggregator: Optional[YieldAggregator] = None,
    wallet_activity_provider: Optional[
        WalletActivityProvider
    ] = None,
)

Unified market data interface for strategy decision-making.

MarketSnapshot provides a clean, synchronous interface for strategies to access market data. It wraps underlying async data providers and handles event loop management internally.

All methods raise clear exceptions on failure - there are no silent defaults or fallback values. Strategies should handle exceptions appropriately.

Attributes:

Name Type Description
chain str

Blockchain network (e.g., "arbitrum", "ethereum")

wallet_address str

Address of the wallet for balance queries

timestamp datetime

When the snapshot was created

Example

Create snapshot with all providers

snapshot = MarketSnapshot( chain="arbitrum", wallet_address="0x...", price_oracle=price_aggregator, balance_provider=web3_balance_provider, rsi_calculator=rsi_calculator, )

Get aggregated price

eth_price = snapshot.price("WETH") # Decimal("2500.50")

Get wallet balance

usdc_balance = snapshot.balance("USDC") # Decimal("1000.00")

Get RSI indicator

rsi = snapshot.rsi("WETH", period=14) # 45.5

Get balance in USD terms

usdc_value = snapshot.balance_usd("USDC") # Decimal("1000.00")

Get total portfolio value

total = snapshot.total_portfolio_usd(["WETH", "USDC", "ARB"])

Initialize the MarketSnapshot.

Parameters:

Name Type Description Default
chain str

Blockchain network name (e.g., "arbitrum", "ethereum")

required
wallet_address str

Wallet address for balance queries

required
price_oracle PriceOracle | None

PriceOracle implementation for price data

None
balance_provider BalanceProvider | None

BalanceProvider implementation for balance data

None
rsi_calculator RSICalculator | None

RSICalculator implementation for RSI indicator

None
ohlcv_module Optional[OHLCVModule]

OHLCVModule for historical candlestick data

None
gas_oracle Optional[GasOracle]

GasOracle implementation for gas price data

None
pool_reader Optional[UniswapV3PoolReader]

UniswapV3PoolReader for DEX pool data

None
rate_monitor Optional[RateMonitor]

RateMonitor for lending protocol rates

None
funding_rate_provider Optional[FundingRateProvider]

FundingRateProvider for perpetual funding rates

None
multi_dex_service Optional[MultiDexPriceService]

MultiDexPriceService for cross-DEX price comparison

None
il_calculator Optional[ILCalculator]

ILCalculator for impermanent loss calculations

None
prediction_provider Optional[PredictionMarketDataProvider]

PredictionMarketDataProvider for prediction market data

None
stablecoin_config StablecoinConfig | None

Configuration for stablecoin pricing behavior. Default is StablecoinConfig(mode='market') which uses actual market prices.

None
freshness_config FreshnessConfig | None

Configuration for data freshness thresholds. Default is FreshnessConfig(price_warn_sec=30, price_error_sec=300).

None
timestamp datetime | None

Optional snapshot timestamp (defaults to now)

None
pool_reader_registry Optional[PoolReaderRegistry]

PoolReaderRegistry for on-chain pool price reads

None
price_aggregator Optional[PriceAggregator]

PriceAggregator for TWAP/LWAP aggregation

None
data_router Optional[DataRouter]

DataRouter for provider selection and failover

None
ohlcv_router Optional[OHLCVRouter]

OHLCVRouter for multi-provider OHLCV with CEX/DEX awareness

None
pool_history_reader Optional[PoolHistoryReader]

PoolHistoryReader for historical pool state data

None
rate_history_reader Optional[RateHistoryReader]

RateHistoryReader for historical lending/funding rate data

None
liquidity_depth_reader Optional[LiquidityDepthReader]

LiquidityDepthReader for tick-level liquidity reads

None
slippage_estimator Optional[SlippageEstimator]

SlippageEstimator for swap slippage estimation

None
volatility_calculator Optional[RealizedVolatilityCalculator]

RealizedVolatilityCalculator for vol metrics

None
risk_calculator Optional[PortfolioRiskCalculator]

PortfolioRiskCalculator for portfolio risk metrics

None
pool_analytics_reader Optional[PoolAnalyticsReader]

PoolAnalyticsReader for pool TVL, volume, fee APR

None
yield_aggregator Optional[YieldAggregator]

YieldAggregator for cross-protocol yield comparison

None

chain property

chain: str

Get the blockchain network name.

wallet_address property

wallet_address: str

Get the wallet address.

timestamp property

timestamp: datetime

Get the snapshot creation timestamp.

wallet_activity

wallet_activity(
    leader_address: str | None = None,
    action_types: list[str] | None = None,
    min_usd_value: Decimal | None = None,
    protocols: list[str] | None = None,
) -> list

Get leader wallet activity signals for copy trading.

Returns filtered signals from the WalletActivityProvider. If no provider is configured, returns an empty list (graceful degradation).

Parameters:

Name Type Description Default
leader_address str | None

Filter by specific leader wallet address

None
action_types list[str] | None

Filter by action types (e.g., ["SWAP"])

None
min_usd_value Decimal | None

Minimum USD value filter

None
protocols list[str] | None

Filter by protocol names (e.g., ["uniswap_v3"])

None

Returns:

Type Description
list

List of CopySignal objects matching the filters

price

price(token: str, quote: str = 'USD') -> Decimal

Get the aggregated price for a token.

Fetches the price from the configured PriceOracle, which may aggregate prices from multiple sources.

For stablecoins, the behavior depends on the configured StablecoinConfig: - mode='market' (default): Returns actual market price - mode='pegged': Returns Decimal('1.00') for configured stablecoins - mode='hybrid': Returns $1.00 if within tolerance, else market price

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH", "USDC")

required
quote str

Quote currency (default "USD")

'USD'

Returns:

Type Description
Decimal

Price as a Decimal for precision

Raises:

Type Description
PriceUnavailableError

If price cannot be determined

ValueError

If no price oracle is configured

Example

eth_price = snapshot.price("WETH")

Returns: Decimal("2500.50")

With mode='pegged'

usdc_price = snapshot.price("USDC")

Returns: Decimal("1.00")

balance

balance(token: str) -> Decimal

Get the wallet balance for a token.

Queries the balance from the configured BalanceProvider.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "USDC") or "ETH" for native

required

Returns:

Type Description
Decimal

Balance as a Decimal in human-readable units (not wei)

Raises:

Type Description
BalanceUnavailableError

If balance cannot be determined

ValueError

If no balance provider is configured

Example

usdc_balance = snapshot.balance("USDC")

Returns: Decimal("1000.50")

rsi

rsi(
    token: str, period: int = 14, timeframe: str = "4h"
) -> float

Get the RSI (Relative Strength Index) for a token.

Calculates RSI using the configured RSI calculator with historical price data.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
period int

RSI calculation period (default 14)

14
timeframe str

OHLCV candle timeframe (default "4h") Supported: "1m", "5m", "15m", "1h", "4h", "1d" Note: 1m/5m/15m may return 30-min candles (CoinGecko limitation)

'4h'

Returns:

Type Description
float

RSI value from 0 to 100 as a float

Raises:

Type Description
RSIUnavailableError

If RSI cannot be calculated

ValueError

If no RSI calculator is configured

Example

Default 4-hour candles

rsi = snapshot.rsi("WETH", period=14)

1-hour candles for shorter-term analysis

rsi_1h = snapshot.rsi("WETH", period=14, timeframe="1h")

Daily candles for longer-term analysis

rsi_1d = snapshot.rsi("WETH", period=14, timeframe="1d")

Multi-timeframe analysis

if snapshot.rsi("WETH", timeframe="1h") < 30 and snapshot.rsi("WETH", timeframe="1d") < 50: # Short-term oversold, long-term not overbought return SwapIntent(...)

sma

sma(
    token: str, period: int = 20, timeframe: str = "1h"
) -> float

Get the Simple Moving Average (SMA) for a token.

SMA is the unweighted mean of the last N closing prices.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
period int

Number of periods for the average (default 20)

20
timeframe str

OHLCV candle timeframe (default "1h")

'1h'

Returns:

Type Description
float

SMA value as float

Example

sma_20 = snapshot.sma("WETH", period=20, timeframe="1h") sma_200 = snapshot.sma("WETH", period=200, timeframe="1d")

Trading logic

if current_price > sma_20: print("Price above 20-period SMA - bullish trend")

ema

ema(
    token: str,
    period: int = 12,
    timeframe: str = "1h",
    smoothing: float = 2.0,
) -> float

Get the Exponential Moving Average (EMA) for a token.

EMA gives more weight to recent prices using exponential decay.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
period int

Number of periods (default 12)

12
timeframe str

OHLCV candle timeframe (default "1h")

'1h'
smoothing float

Smoothing factor (default 2.0)

2.0

Returns:

Type Description
float

EMA value as float

Example

ema_12 = snapshot.ema("WETH", period=12, timeframe="1h") ema_26 = snapshot.ema("WETH", period=26, timeframe="1h")

Golden cross check

if ema_12 > ema_26: print("Golden cross - bullish signal")

bollinger_bands

bollinger_bands(
    token: str,
    period: int = 20,
    std_dev: float = 2.0,
    timeframe: str = "1h",
) -> BollingerBandsResult

Get Bollinger Bands for a token.

Bollinger Bands consist of a middle band (SMA) with upper and lower bands at a specified number of standard deviations away.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
period int

SMA period (default 20)

20
std_dev float

Standard deviation multiplier (default 2.0)

2.0
timeframe str

OHLCV candle timeframe (default "1h")

'1h'

Returns:

Type Description
BollingerBandsResult

BollingerBandsResult with: - upper_band: Upper band value - middle_band: Middle band (SMA) value - lower_band: Lower band value - bandwidth: Band width as percentage - percent_b: Price position (0=lower, 1=upper)

Example

bb = snapshot.bollinger_bands("WETH", period=20, std_dev=2.0, timeframe="1h")

if bb.percent_b < 0: print("Price below lower band - oversold!") elif bb.percent_b > 1: print("Price above upper band - overbought!")

if bb.bandwidth < 0.05: print("Low volatility - squeeze detected")

macd

macd(
    token: str,
    fast_period: int = 12,
    slow_period: int = 26,
    signal_period: int = 9,
    timeframe: str = "1h",
) -> MACDResult

Get MACD (Moving Average Convergence Divergence) for a token.

MACD is a trend-following momentum indicator showing the relationship between two exponential moving averages.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
fast_period int

Fast EMA period (default 12)

12
slow_period int

Slow EMA period (default 26)

26
signal_period int

Signal line EMA period (default 9)

9
timeframe str

OHLCV candle timeframe (default "1h")

'1h'

Returns:

Type Description
MACDResult

MACDResult with: - macd_line: MACD line (fast EMA - slow EMA) - signal_line: Signal line (EMA of MACD line) - histogram: MACD histogram (macd_line - signal_line)

Example

macd = snapshot.macd("WETH", timeframe="4h")

if macd.histogram > 0: print("MACD above signal - bullish momentum") elif macd.histogram < 0: print("MACD below signal - bearish momentum")

stochastic

stochastic(
    token: str,
    k_period: int = 14,
    d_period: int = 3,
    timeframe: str = "1h",
) -> StochasticResult

Get Stochastic Oscillator for a token.

The Stochastic Oscillator is a momentum indicator comparing a token's closing price to its price range over a given period.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
k_period int

Lookback period for %K (default 14)

14
d_period int

SMA period for %D (default 3)

3
timeframe str

OHLCV candle timeframe (default "1h")

'1h'

Returns:

Type Description
StochasticResult

StochasticResult with: - k_value: %K (fast stochastic, 0-100 scale) - d_value: %D (slow stochastic, 0-100 scale)

Example

stoch = snapshot.stochastic("WETH", k_period=14, d_period=3)

if stoch.k_value < 20: print("Oversold territory") elif stoch.k_value > 80: print("Overbought territory")

Crossover signals

if stoch.k_value > stoch.d_value: print("Bullish - %K crossed above %D")

atr

atr(
    token: str, period: int = 14, timeframe: str = "1h"
) -> float

Get Average True Range (ATR) for a token.

ATR is a volatility indicator showing how much an asset moves on average.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "ETH")

required
period int

ATR period (default 14)

14
timeframe str

OHLCV candle timeframe (default "1h")

'1h'

Returns:

Type Description
float

ATR value (in the same units as the token price)

Example

atr = snapshot.atr("WETH", period=14, timeframe="4h") current_price = float(snapshot.price("WETH"))

Stop-loss placement

stop_loss = current_price - (2 * atr) print(f"Stop loss at ${stop_loss:.2f} (2 ATR below)")

Position sizing with 1% risk

risk_amount = 10000 * 0.01 # $100 position_size = risk_amount / atr print(f"Position size: {position_size:.4f} units")

ohlcv

ohlcv(
    token: str | Instrument,
    timeframe: str = "1h",
    limit: int = 100,
    quote: str = "USD",
    gap_strategy: GapStrategy = "nan",
    *,
    pool_address: str | None = None,
) -> pd.DataFrame

Get OHLCV (candlestick) data for a token.

Fetches historical candlestick data from the configured OHLCV providers. When an OHLCVRouter is configured, automatically classifies instruments as CEX-primary or DeFi-primary and routes to the appropriate provider.

Accepts plain token symbols (e.g. "WETH"), pair strings (e.g. "WETH/USDC"), or Instrument objects.

Parameters:

Name Type Description Default
token str | Instrument

Token symbol, "BASE/QUOTE" string, or Instrument.

required
timeframe str

Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d). Default "1h".

'1h'
limit int

Maximum number of candles to return. Default 100.

100
quote str

Quote currency (default "USD")

'USD'
gap_strategy GapStrategy

How to handle gaps in data: - 'nan': Fill gaps with NaN values (default) - 'ffill': Forward-fill gaps with last known values - 'drop': Remove gaps (returns only continuous data)

'nan'
pool_address str | None

Explicit pool address for DEX providers (optional).

None

Returns:

Type Description
DataFrame

pandas DataFrame with columns: - timestamp: datetime - open: float64 - high: float64 - low: float64 - close: float64 - volume: float64 (may contain NaN if unavailable)

DataFrame

DataFrame.attrs includes metadata: - base: Token symbol - quote: Quote currency - timeframe: Candle timeframe - source: Provider source name - chain: Chain identifier - fetched_at: When the data was fetched - confidence: Data confidence (0.0-1.0)

DataFrame

Returns empty DataFrame with correct schema if no data available.

Raises:

Type Description
OHLCVUnavailableError

If OHLCV data cannot be retrieved

ValueError

If no OHLCV module/router is configured or invalid timeframe

Example

Get 1-hour candles for WETH

df = snapshot.ohlcv("WETH", timeframe="1h", limit=100) print(df.columns) # timestamp, open, high, low, close, volume

Use Instrument for explicit routing

from almanak.framework.data.models import Instrument inst = Instrument(base="WETH", quote="USDC", chain="arbitrum") df = snapshot.ohlcv(inst, timeframe="1h")

Use with pandas-ta for indicators

import pandas_ta as ta df['rsi'] = ta.rsi(df['close'], length=14) df['macd'] = ta.macd(df['close'])['MACD_12_26_9']

Handle gaps with forward-fill

df = snapshot.ohlcv("WETH", gap_strategy="ffill")

gas_price

gas_price(chain: str | None = None) -> GasPrice

Get current gas price for a chain.

Fetches the current gas price data from the configured GasOracle. Results are cached for 12 seconds (approximately 1 block) to avoid excessive RPC calls.

Parameters:

Name Type Description Default
chain str | None

Chain identifier (e.g., "ethereum", "arbitrum", "optimism"). If not specified, uses the strategy's primary chain from the MarketSnapshot.

None

Returns:

Type Description
GasPrice

GasPrice dataclass with: - chain: Chain identifier - base_fee_gwei: Network base fee in gwei - priority_fee_gwei: Priority/tip fee in gwei - max_fee_gwei: Maximum fee (base + priority) in gwei - l1_base_fee_gwei: L1 base fee for L2 chains (optional) - l1_data_cost_gwei: L1 data cost for L2 chains (optional) - estimated_cost_usd: Estimated cost in USD for 21000 gas - timestamp: When the gas price was observed

Raises:

Type Description
GasUnavailableError

If gas price cannot be retrieved

ValueError

If no gas oracle is configured

Example

Get gas for strategy's primary chain

gas = snapshot.gas_price() print(f"Base fee: {gas.base_fee_gwei} gwei") print(f"Estimated cost: ${gas.estimated_cost_usd}")

Get gas for specific chain

arb_gas = snapshot.gas_price("arbitrum") if arb_gas.is_l2: print(f"L1 data cost: {arb_gas.l1_data_cost_gwei} gwei")

pool_price

pool_price(
    pool_address: str, chain: str | None = None
) -> DataEnvelope[PoolPrice]

Get the live price from an on-chain DEX pool.

Reads slot0() from the pool contract and decodes sqrtPriceX96 into a human-readable price using token decimals.

Returns a DataEnvelope[PoolPrice] with EXECUTION_GRADE classification (fail-closed: raises on any error, no off-chain fallback).

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str | None

Chain name (e.g. "arbitrum", "base"). Defaults to the snapshot's primary chain.

None

Returns:

Type Description
DataEnvelope[PoolPrice]

DataEnvelope[PoolPrice] with provenance metadata.

Raises:

Type Description
PoolPriceUnavailableError

If pool price cannot be retrieved.

ValueError

If no pool reader registry is configured.

pool_price_by_pair

pool_price_by_pair(
    token_a: str,
    token_b: str,
    chain: str | None = None,
    protocol: str | None = None,
    fee_tier: int = 3000,
) -> DataEnvelope[PoolPrice]

Get the live pool price for a token pair.

Resolves the pool address for the given pair and reads the price. This is a convenience method that wraps pool address resolution and price reading.

Parameters:

Name Type Description Default
token_a str

Token A symbol or address.

required
token_b str

Token B symbol or address.

required
chain str | None

Chain name. Defaults to the snapshot's primary chain.

None
protocol str | None

Protocol name (e.g. "uniswap_v3"). If None, tries all registered protocols.

None
fee_tier int

Fee tier in basis points (default 3000 = 0.3%).

3000

Returns:

Type Description
DataEnvelope[PoolPrice]

DataEnvelope[PoolPrice] with provenance metadata.

Raises:

Type Description
PoolPriceUnavailableError

If pool cannot be found or price cannot be read.

ValueError

If no pool reader registry is configured.

twap

twap(
    token_pair: str | Instrument,
    chain: str | None = None,
    window_seconds: int = 300,
    pool_address: str | None = None,
    protocol: str = "uniswap_v3",
) -> DataEnvelope[AggregatedPrice]

Get the time-weighted average price (TWAP) for a token pair.

Uses the Uniswap V3 oracle's observe() function to compute the TWAP over the specified time window.

Classification: EXECUTION_GRADE (fail-closed, no off-chain fallback).

Parameters:

Name Type Description Default
token_pair str | Instrument

Token pair as "BASE/QUOTE" string (e.g. "WETH/USDC") or an Instrument instance.

required
chain str | None

Chain name. Defaults to the snapshot's primary chain.

None
window_seconds int

Time window in seconds (default 300 = 5 min).

300
pool_address str | None

Explicit pool address. If None, resolves from pair.

None
protocol str

Protocol to use (default "uniswap_v3").

'uniswap_v3'

Returns:

Type Description
DataEnvelope[AggregatedPrice]

DataEnvelope[AggregatedPrice] with TWAP price and provenance.

Raises:

Type Description
PoolPriceUnavailableError

If TWAP cannot be calculated.

ValueError

If no price aggregator is configured.

lwap

lwap(
    token_pair: str | Instrument,
    chain: str | None = None,
    fee_tiers: list[int] | None = None,
    protocols: list[str] | None = None,
) -> DataEnvelope[AggregatedPrice]

Get the liquidity-weighted average price (LWAP) for a token pair.

Reads live prices from all known pools for the pair, filters by minimum liquidity, and computes a liquidity-weighted average.

Classification: EXECUTION_GRADE (fail-closed, no off-chain fallback).

Parameters:

Name Type Description Default
token_pair str | Instrument

Token pair as "BASE/QUOTE" string (e.g. "WETH/USDC") or an Instrument instance.

required
chain str | None

Chain name. Defaults to the snapshot's primary chain.

None
fee_tiers list[int] | None

Fee tiers to search (default: [100, 500, 3000, 10000]).

None
protocols list[str] | None

Protocols to search (default: all registered for chain).

None

Returns:

Type Description
DataEnvelope[AggregatedPrice]

DataEnvelope[AggregatedPrice] with LWAP price and provenance.

Raises:

Type Description
PoolPriceUnavailableError

If LWAP cannot be calculated.

ValueError

If no price aggregator is configured.

pool_history

pool_history(
    pool_address: str,
    chain: str | None = None,
    start_date: datetime | None = None,
    end_date: datetime | None = None,
    resolution: str = "1h",
) -> DataEnvelope[list[PoolSnapshot]]

Get historical pool state snapshots for backtesting and analytics.

Fetches TVL, volume, fee revenue, and reserve data from The Graph, DeFi Llama, or GeckoTerminal with graceful fallback between providers. Results are cached in VersionedDataCache for deterministic replay.

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str | None

Chain name (e.g. "arbitrum", "ethereum"). Defaults to strategy chain.

None
start_date datetime | None

Start of the history window (UTC). Defaults to 90 days ago.

None
end_date datetime | None

End of the history window (UTC). Defaults to now.

None
resolution str

Data resolution: "1h", "4h", or "1d". Default "1h".

'1h'

Returns:

Type Description
DataEnvelope[list[PoolSnapshot]]

DataEnvelope[list[PoolSnapshot]] with INFORMATIONAL classification.

Raises:

Type Description
PoolHistoryUnavailableError

If historical data cannot be retrieved.

ValueError

If no pool history reader is configured.

liquidity_depth

liquidity_depth(
    pool_address: str, chain: str | None = None
) -> DataEnvelope[LiquidityDepth]

Get tick-level liquidity depth for a concentrated-liquidity pool.

Reads the tick bitmap and individual tick liquidity values from the pool contract to build a picture of liquidity distribution around the current price. Essential for slippage estimation and position sizing.

Classification: EXECUTION_GRADE (fails closed, no off-chain fallback).

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str | None

Chain name (e.g. "arbitrum", "ethereum"). Defaults to strategy chain.

None

Returns:

Type Description
DataEnvelope[LiquidityDepth]

DataEnvelope[LiquidityDepth] with tick-level liquidity data.

Raises:

Type Description
LiquidityDepthUnavailableError

If liquidity data cannot be read.

ValueError

If no liquidity depth reader is configured.

estimate_slippage

estimate_slippage(
    token_in: str,
    token_out: str,
    amount: Decimal,
    chain: str | None = None,
    protocol: str | None = None,
) -> DataEnvelope[SlippageEstimate]

Estimate price impact and slippage for a potential swap.

Simulates the swap through tick ranges using actual on-chain liquidity data to compute the expected execution price and slippage. Logs a warning if estimated slippage exceeds the configured threshold (default 1%).

Classification: EXECUTION_GRADE (fails closed, no off-chain fallback).

Parameters:

Name Type Description Default
token_in str

Input token symbol or address.

required
token_out str

Output token symbol or address.

required
amount Decimal

Amount of token_in to swap (human-readable units).

required
chain str | None

Chain name. Defaults to strategy chain.

None
protocol str | None

Protocol name (e.g. "uniswap_v3"). Auto-detected if None.

None

Returns:

Type Description
DataEnvelope[SlippageEstimate]

DataEnvelope[SlippageEstimate] with price impact data.

Raises:

Type Description
SlippageEstimateUnavailableError

If slippage cannot be estimated.

ValueError

If no slippage estimator is configured.

pool_reserves

pool_reserves(
    pool_address: str, chain: str | None = None
) -> PoolReserves

Get DEX pool reserves and state.

Fetches the current state of a DEX liquidity pool from the blockchain. Auto-detects the pool type (Uniswap V2 vs V3) by checking the contract interface.

Results are cached for 12 seconds (approximately 1 block) to avoid excessive RPC calls.

Parameters:

Name Type Description Default
pool_address str

Pool contract address

required
chain str | None

Chain identifier (e.g., "ethereum", "arbitrum", "optimism"). If not specified, uses the strategy's primary chain from the MarketSnapshot.

None

Returns:

Type Description
PoolReserves

PoolReserves dataclass with: - pool_address: Pool contract address - dex: DEX type ('uniswap_v2', 'uniswap_v3', 'sushiswap') - token0: First token in the pair (ChainToken) - token1: Second token in the pair (ChainToken) - reserve0: Reserve of token0 (human-readable Decimal) - reserve1: Reserve of token1 (human-readable Decimal) - fee_tier: Pool fee in basis points - sqrt_price_x96: V3 sqrt price (None for V2) - tick: V3 current tick (None for V2) - liquidity: V3 in-range liquidity (None for V2) - tvl_usd: Total value locked in USD - last_updated: When the data was fetched

Raises:

Type Description
PoolReservesUnavailableError

If pool data cannot be retrieved

ValueError

If no pool reader is configured

Example

Get pool reserves for USDC/WETH pool

pool = snapshot.pool_reserves( "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640" ) print(f"Reserve0: {pool.reserve0} {pool.token0.symbol}") print(f"Reserve1: {pool.reserve1} {pool.token1.symbol}") print(f"TVL: ${pool.tvl_usd}")

Check if V3 pool

if pool.is_v3: print(f"Current tick: {pool.tick}")

prices

prices(
    tokens: list[str], quote: str = "USD"
) -> dict[str, Decimal]

Get prices for multiple tokens in a single batch call.

Fetches prices for all specified tokens in parallel using asyncio.gather. Returns partial results if some tokens fail (errors are logged).

Parameters:

Name Type Description Default
tokens list[str]

List of token symbols (e.g., ["WETH", "USDC", "ARB"])

required
quote str

Quote currency (default "USD")

'USD'

Returns:

Type Description
dict[str, Decimal]

Dictionary mapping token symbols to their prices as Decimal.

dict[str, Decimal]

Only includes tokens that were successfully fetched.

Raises:

Type Description
ValueError

If no price oracle is configured

Example

prices = snapshot.prices(["WETH", "USDC", "ARB"])

Returns:

If ARB fails to fetch:

Returns:

(Error is logged)

balances

balances(tokens: list[str]) -> dict[str, Decimal]

Get balances for multiple tokens in a single batch call.

Fetches balances for all specified tokens in parallel using asyncio.gather. Returns partial results if some tokens fail (errors are logged).

Parameters:

Name Type Description Default
tokens list[str]

List of token symbols (e.g., ["WETH", "USDC", "ARB"])

required

Returns:

Type Description
dict[str, Decimal]

Dictionary mapping token symbols to their balances as Decimal.

dict[str, Decimal]

Only includes tokens that were successfully fetched.

Raises:

Type Description
ValueError

If no balance provider is configured

Example

balances = snapshot.balances(["WETH", "USDC", "ARB"])

Returns:

If ARB fails to fetch:

Returns:

(Error is logged)

health

health() -> HealthReport

Get a health report for all registered data providers.

Aggregates health metrics from all configured providers including price oracle, balance provider, OHLCV module, gas oracle, and pool reader.

The health report includes: - Individual source health (success rate, latency, errors) - Cache statistics (hits, misses, hit rate) - Overall system status (healthy, degraded, unhealthy)

Returns:

Type Description
HealthReport

HealthReport dataclass with: - timestamp: When the report was generated - sources: Dictionary mapping source names to SourceHealth - cache_stats: CacheStats with cache performance metrics - overall_status: "healthy", "degraded", or "unhealthy"

Example

report = snapshot.health() print(f"Overall status: {report.overall_status}") print(f"Sources: {list(report.sources.keys())}")

for name, health in report.sources.items(): print(f" {name}: {health.success_rate:.1%} success rate")

if report.failing_sources: print(f"Warning: failing sources: {report.failing_sources}")

balance_usd

balance_usd(token: str) -> Decimal

Get the wallet balance value in USD terms.

Calculates the USD value by multiplying the token balance by its current price.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "USDC")

required

Returns:

Type Description
Decimal

Balance value in USD as a Decimal

Raises:

Type Description
PriceUnavailableError

If price cannot be determined

BalanceUnavailableError

If balance cannot be determined

Example

eth_value = snapshot.balance_usd("WETH")

If balance is 2 WETH at $2500, returns: Decimal("5000.00")

total_portfolio_usd

total_portfolio_usd(tokens: list[str]) -> Decimal

Get the total portfolio value in USD across multiple tokens.

Sums the USD value of all specified token balances.

Parameters:

Name Type Description Default
tokens list[str]

List of token symbols to include

required

Returns:

Type Description
Decimal

Total portfolio value in USD as a Decimal

Raises:

Type Description
PriceUnavailableError

If any price cannot be determined

BalanceUnavailableError

If any balance cannot be determined

Example

total = snapshot.total_portfolio_usd(["WETH", "USDC", "ARB"])

Returns: Decimal("10500.00")

lending_rate

lending_rate(
    protocol: str, token: str, side: str = "supply"
) -> LendingRate

Get the lending rate for a specific protocol and token.

Fetches the current supply or borrow APY from the specified lending protocol. Rates are cached for efficiency (typically 12s = ~1 block).

Parameters:

Name Type Description Default
protocol str

Protocol identifier (aave_v3, morpho_blue, compound_v3)

required
token str

Token symbol (e.g., "USDC", "WETH")

required
side str

Rate side - "supply" or "borrow" (default "supply")

'supply'

Returns:

Type Description
LendingRate

LendingRate dataclass with: - protocol: Protocol identifier - token: Token symbol - side: Rate side - apy_ray: APY in ray units (1e27 scale) - apy_percent: APY as percentage (e.g., 5.25 for 5.25%) - utilization_percent: Pool utilization percentage - timestamp: When rate was fetched - chain: Blockchain network - market_id: Market identifier (optional)

Raises:

Type Description
LendingRateUnavailableError

If rate cannot be retrieved

ValueError

If no rate monitor is configured

Example

Get Aave USDC supply rate

rate = snapshot.lending_rate("aave_v3", "USDC", "supply") print(f"Aave USDC Supply APY: {rate.apy_percent:.2f}%")

Get Morpho WETH borrow rate

rate = snapshot.lending_rate("morpho_blue", "WETH", "borrow") print(f"Morpho WETH Borrow APY: {rate.apy_percent:.2f}%")

best_lending_rate

best_lending_rate(
    token: str,
    side: str = "supply",
    protocols: list[str] | None = None,
) -> BestRateResult

Get the best lending rate for a token across protocols.

Compares rates from all available lending protocols and returns the optimal one. For supply rates, returns highest APY. For borrow rates, returns lowest APY.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "USDC", "WETH")

required
side str

Rate side - "supply" or "borrow" (default "supply")

'supply'
protocols list[str] | None

Protocols to compare (default: all available on chain)

None

Returns:

Type Description
BestRateResult

BestRateResult dataclass with: - token: Token symbol - side: Rate side - best_rate: The best LendingRate found (or None if all failed) - all_rates: List of all rates from different protocols - timestamp: When comparison was made

Raises:

Type Description
ValueError

If no rate monitor is configured

Example

Find best USDC supply rate

result = snapshot.best_lending_rate("USDC", "supply") if result.best_rate: print(f"Best rate: {result.best_rate.protocol} at {result.best_rate.apy_percent:.2f}%")

# Compare all protocols
for rate in result.all_rates:
    print(f"  {rate.protocol}: {rate.apy_percent:.2f}%")

funding_rate

funding_rate(venue: str, market: str) -> FundingRate

Get the funding rate for a perpetual venue and market.

Fetches the current funding rate from the specified venue. Funding rates indicate the cost of holding perpetual positions: - Positive rate: Longs pay shorts (bullish market) - Negative rate: Shorts pay longs (bearish market)

Parameters:

Name Type Description Default
venue str

Venue identifier (gmx_v2, hyperliquid)

required
market str

Market symbol (e.g., "ETH-USD", "BTC-USD")

required

Returns:

Type Description
FundingRate

FundingRate dataclass with: - venue: Venue identifier - market: Market symbol - rate_hourly: Hourly funding rate - rate_8h: 8-hour funding rate (typical display) - rate_annualized: Annualized rate for comparison - next_funding_time: Next settlement time - open_interest_long: Total long OI in USD - open_interest_short: Total short OI in USD - mark_price: Current mark price - index_price: Current index price

Raises:

Type Description
FundingRateUnavailableError

If rate cannot be retrieved

ValueError

If no funding rate provider is configured

Example

Get GMX V2 ETH funding rate

rate = snapshot.funding_rate("gmx_v2", "ETH-USD") print(f"8h rate: {rate.rate_percent_8h:.4f}%") print(f"Annualized: {rate.rate_percent_annualized:.2f}%")

Check if longs are paying

if rate.is_positive: print("Longs pay shorts (bullish sentiment)")

funding_rate_spread

funding_rate_spread(
    market: str, venue_a: str, venue_b: str
) -> FundingRateSpread

Get the funding rate spread between two venues.

Compares funding rates from two venues to identify arbitrage opportunities. A positive spread means venue_a has higher funding than venue_b.

The spread can be used for funding rate arbitrage: - If spread > 0: Short venue_a, long venue_b - If spread < 0: Short venue_b, long venue_a

Parameters:

Name Type Description Default
market str

Market symbol (e.g., "ETH-USD")

required
venue_a str

First venue identifier

required
venue_b str

Second venue identifier

required

Returns:

Type Description
FundingRateSpread

FundingRateSpread dataclass with: - market: Market symbol - venue_a: First venue - venue_b: Second venue - rate_a: Funding rate at venue_a - rate_b: Funding rate at venue_b - spread_8h: 8-hour spread (rate_a - rate_b) - spread_annualized: Annualized spread - is_profitable: True if spread exceeds threshold - recommended_direction: Trade direction for arb

Raises:

Type Description
FundingRateUnavailableError

If either rate cannot be retrieved

ValueError

If no funding rate provider is configured

Example

Compare GMX V2 vs Hyperliquid for ETH

spread = snapshot.funding_rate_spread( "ETH-USD", "gmx_v2", "hyperliquid" ) print(f"Spread: {spread.spread_percent_8h:.4f}% (8h)")

if spread.is_profitable: print(f"Arbitrage opportunity: {spread.recommended_direction}")

lending_rate_history

lending_rate_history(
    protocol: str,
    token: str,
    chain: str | None = None,
    days: int = 90,
) -> DataEnvelope[list[LendingRateSnapshot]]

Get historical lending rate snapshots for backtesting.

Fetches supply/borrow APY history from The Graph or DeFi Llama with graceful fallback between providers. Results are cached in VersionedDataCache for deterministic replay.

Parameters:

Name Type Description Default
protocol str

Lending protocol (e.g. "aave_v3", "morpho_blue", "compound_v3").

required
token str

Token symbol (e.g. "USDC", "WETH").

required
chain str | None

Chain name. Defaults to strategy chain.

None
days int

Number of days of history. Default 90.

90

Returns:

Type Description
DataEnvelope[list[LendingRateSnapshot]]

DataEnvelope[list[LendingRateSnapshot]] with INFORMATIONAL classification.

DataEnvelope[list[LendingRateSnapshot]]

Snapshots are sorted ascending by timestamp.

Raises:

Type Description
LendingRateHistoryUnavailableError

If historical data cannot be retrieved.

ValueError

If no rate history reader is configured.

Example

envelope = snapshot.lending_rate_history("aave_v3", "USDC", days=90) for snap in envelope.value: print(f"Supply: {snap.supply_apy}%, Borrow: {snap.borrow_apy}%")

funding_rate_history

funding_rate_history(
    venue: str, market_symbol: str, hours: int = 168
) -> DataEnvelope[list[FundingRateSnapshot]]

Get historical funding rate snapshots for backtesting.

Fetches funding rate history from Hyperliquid API or DeFi Llama with graceful fallback. Results are cached in VersionedDataCache for deterministic replay.

Parameters:

Name Type Description Default
venue str

Perps venue (e.g. "hyperliquid", "gmx_v2").

required
market_symbol str

Market symbol (e.g. "ETH-USD", "BTC-USD").

required
hours int

Number of hours of history. Default 168 (7 days).

168

Returns:

Type Description
DataEnvelope[list[FundingRateSnapshot]]

DataEnvelope[list[FundingRateSnapshot]] with INFORMATIONAL classification.

DataEnvelope[list[FundingRateSnapshot]]

Snapshots are sorted ascending by timestamp.

Raises:

Type Description
FundingRateHistoryUnavailableError

If historical data cannot be retrieved.

ValueError

If no rate history reader is configured.

Example

envelope = snapshot.funding_rate_history("hyperliquid", "ETH-USD", hours=168) for snap in envelope.value: print(f"Rate: {snap.rate}, Annualized: {snap.annualized_rate}")

price_across_dexs

price_across_dexs(
    token_in: str,
    token_out: str,
    amount: Decimal,
    dexs: list[str] | None = None,
) -> MultiDexPriceResult

Get prices from multiple DEXs for comparison.

Fetches quotes from all configured DEXs (Uniswap V3, Curve, Enso) and returns a comparison of prices and execution details. Use this to identify the best execution venue for a swap.

Parameters:

Name Type Description Default
token_in str

Input token symbol (e.g., "USDC", "WETH")

required
token_out str

Output token symbol (e.g., "WETH", "USDC")

required
amount Decimal

Input amount (human-readable, e.g., Decimal("10000") for 10k)

required
dexs list[str] | None

DEXs to query (default: all available on chain)

None

Returns:

Type Description
MultiDexPriceResult

MultiDexPriceResult dataclass with: - token_in: Input token symbol - token_out: Output token symbol - amount_in: Input amount - quotes: Dictionary mapping DEX name to DexQuote - best_quote: Quote with highest output amount - price_spread_bps: Spread between best and worst in bps

Raises:

Type Description
DexQuoteUnavailableError

If no quotes can be fetched

ValueError

If no multi-DEX service is configured

Example

Compare prices for 10k USDC -> WETH

result = snapshot.price_across_dexs( "USDC", "WETH", Decimal("10000") )

for dex, quote in result.quotes.items(): print(f"{dex}: {quote.amount_out} WETH") print(f" Price impact: {quote.price_impact_bps} bps") print(f" Slippage estimate: {quote.slippage_estimate_bps} bps")

print(f"Best venue: {result.best_quote.dex}") print(f"Price spread: {result.price_spread_bps} bps")

best_dex_price

best_dex_price(
    token_in: str,
    token_out: str,
    amount: Decimal,
    dexs: list[str] | None = None,
) -> BestDexResult

Get the best DEX for a trade.

Compares prices from all configured DEXs and returns the one with the highest output amount (best execution). This is useful for routing trades to the optimal venue.

Parameters:

Name Type Description Default
token_in str

Input token symbol (e.g., "USDC", "WETH")

required
token_out str

Output token symbol (e.g., "WETH", "USDC")

required
amount Decimal

Input amount (human-readable, e.g., Decimal("10000") for 10k)

required
dexs list[str] | None

DEXs to compare (default: all available on chain)

None

Returns:

Type Description
BestDexResult

BestDexResult dataclass with: - token_in: Input token symbol - token_out: Output token symbol - amount_in: Input amount - best_dex: Best DEX for the trade (e.g., "uniswap_v3") - best_quote: DexQuote from the best DEX - all_quotes: List of quotes from all DEXs - savings_vs_worst_bps: Savings vs worst venue in bps

Raises:

Type Description
ValueError

If no multi-DEX service is configured

Example

Find best venue for USDC -> WETH swap

result = snapshot.best_dex_price( "USDC", "WETH", Decimal("10000") )

if result.best_quote: print(f"Best DEX: {result.best_dex}") print(f"Output: {result.best_quote.amount_out} WETH") print(f"Savings vs worst: {result.savings_vs_worst_bps} bps") else: print("No quotes available")

il_exposure

il_exposure(
    position_id: str, fees_earned: Decimal = Decimal("0")
) -> ILExposure

Get the impermanent loss exposure for a tracked LP position.

Calculates the current IL for a tracked LP position using the position's entry prices and current market prices. This method requires an ILCalculator with the position already registered.

The ILCalculator should be configured with the position via add_position() before calling this method.

Parameters:

Name Type Description Default
position_id str

Unique identifier for the LP position

required
fees_earned Decimal

Optional fees earned by the position (for net PnL calc)

Decimal('0')

Returns:

Type Description
ILExposure

ILExposure dataclass with: - position_id: Position identifier - position: LPPosition details - current_il: ILResult with IL metrics - entry_value: Original position value - current_value: Current position value - fees_earned: Fees earned (if provided) - net_pnl: Net profit/loss including fees

Raises:

Type Description
ILExposureUnavailableError

If exposure cannot be calculated

ValueError

If no IL calculator is configured

Example

Get IL exposure for a position

exposure = snapshot.il_exposure("my-lp-position-123") print(f"Current IL: {exposure.current_il.il_percent:.2f}%") print(f"Entry value: ${exposure.entry_value}") print(f"Current value: ${exposure.current_value}")

if exposure.il_offset_by_fees: print("Fees offset the IL - net positive!")

projected_il

projected_il(
    token_a: str,
    token_b: str,
    price_change_pct: Decimal,
    weight_a: Decimal = Decimal("0.5"),
    weight_b: Decimal = Decimal("0.5"),
) -> ProjectedILResult

Project impermanent loss for a hypothetical price change.

This method simulates what IL would be if token A's price changed by the specified percentage relative to token B. This is useful for understanding IL risk before entering a position.

Parameters:

Name Type Description Default
token_a str

Symbol of token A (the volatile token)

required
token_b str

Symbol of token B (often a stablecoin)

required
price_change_pct Decimal

Price change percentage (e.g., 50 for +50%, -30 for -30%)

required
weight_a Decimal

Weight of token A in the pool (default 0.5)

Decimal('0.5')
weight_b Decimal

Weight of token B in the pool (default 0.5)

Decimal('0.5')

Returns:

Type Description
ProjectedILResult

ProjectedILResult dataclass with: - price_change_pct: The input price change - il_ratio: Projected IL as decimal (e.g., -0.0057 for 0.57% loss) - il_percent: Projected IL as percentage (e.g., -0.57) - il_bps: Projected IL in basis points (e.g., -57) - pool_type: Type of pool (default: constant_product) - weight_a: Weight of token A - weight_b: Weight of token B

Raises:

Type Description
ValueError

If no IL calculator is configured or invalid parameters

Example

What would IL be if ETH goes up 50%?

proj = snapshot.projected_il("WETH", "USDC", Decimal("50")) print(f"If ETH +50%: IL = {proj.il_percent:.2f}%")

What if ETH drops 30%?

proj = snapshot.projected_il("WETH", "USDC", Decimal("-30")) print(f"If ETH -30%: IL = {proj.il_percent:.2f}%")

Weighted pool (80/20 ETH/USDC)

proj = snapshot.projected_il( "WETH", "USDC", price_change_pct=Decimal("100"), weight_a=Decimal("0.8"), weight_b=Decimal("0.2"), ) print(f"80/20 pool, ETH +100%: IL = {proj.il_percent:.2f}%")

prediction

prediction(market_id: str) -> PredictionMarket

Get prediction market data.

Fetches full market details for a prediction market by ID or slug. Uses lazy loading - only fetches prediction data when accessed.

Parameters:

Name Type Description Default
market_id str

Prediction market ID or URL slug Examples: "12345", "will-bitcoin-exceed-100k-2025"

required

Returns:

Type Description
PredictionMarket

PredictionMarket with: - market_id: Internal market ID - condition_id: CTF condition ID (0x...) - question: Market question text - slug: URL slug - yes_price: Current YES outcome price (0-1) - no_price: Current NO outcome price (0-1) - spread: Bid-ask spread - volume_24h: 24-hour trading volume in USDC - liquidity: Current liquidity - end_date: Resolution deadline - is_active: Whether market is accepting orders - is_resolved: Whether market has been resolved

Raises:

Type Description
PredictionUnavailableError

If market data cannot be retrieved

ValueError

If no prediction provider is configured

Example

Get market by ID

market = snapshot.prediction("12345") print(f"YES: {market.yes_price}, NO: {market.no_price}")

Get market by slug

market = snapshot.prediction("will-btc-hit-100k") print(f"Question: {market.question}") print(f"24h Volume: ${market.volume_24h:,.2f}")

Check implied probability

yes_prob = market.yes_price * 100 print(f"Implied probability: {yes_prob:.1f}%")

prediction_positions

prediction_positions(
    market_id: str | None = None,
) -> list[PredictionPosition]

Get all open prediction market positions.

Fetches positions from the prediction market provider. Can optionally filter by market ID.

Parameters:

Name Type Description Default
market_id str | None

Optional market ID or slug to filter by

None

Returns:

Type Description
list[PredictionPosition]

List of PredictionPosition objects with: - market_id: Market ID - condition_id: CTF condition ID - token_id: CLOB token ID - outcome: Position outcome (YES or NO) - size: Number of shares held - avg_price: Average entry price - current_price: Current market price - unrealized_pnl: Unrealized profit/loss - realized_pnl: Realized profit/loss - value: Current position value (size * current_price)

Raises:

Type Description
PredictionUnavailableError

If positions cannot be retrieved

ValueError

If no prediction provider is configured

Example

Get all positions

positions = snapshot.prediction_positions() total_value = sum(p.value for p in positions) print(f"Total position value: ${total_value:,.2f}")

Get positions for a specific market

positions = snapshot.prediction_positions("btc-100k") for pos in positions: print(f"{pos.outcome}: {pos.size} shares @ {pos.avg_price}") print(f" Unrealized PnL: ${pos.unrealized_pnl:,.2f}")

prediction_orders

prediction_orders(
    market_id: str | None = None,
) -> list[PredictionOrder]

Get all open prediction market orders.

Fetches open orders from the prediction market provider. Can optionally filter by market ID.

Parameters:

Name Type Description Default
market_id str | None

Optional market ID or slug to filter by

None

Returns:

Type Description
list[PredictionOrder]

List of PredictionOrder objects with: - order_id: Order ID - market_id: Market ID (token ID) - outcome: Order outcome (YES or NO) - side: Order side (BUY or SELL) - price: Order price - size: Order size in shares - filled_size: Filled amount - remaining_size: Remaining unfilled size - created_at: Order creation timestamp

Raises:

Type Description
PredictionUnavailableError

If orders cannot be retrieved

ValueError

If no prediction provider is configured

Example

Get all open orders

orders = snapshot.prediction_orders() for order in orders: print(f"{order.side} {order.remaining_size} @ {order.price}")

Get orders for a specific market

orders = snapshot.prediction_orders("btc-100k") buy_orders = [o for o in orders if o.side == "BUY"] print(f"Open buy orders: {len(buy_orders)}")

realized_vol

realized_vol(
    token: str,
    window_days: int = 30,
    timeframe: str = "1h",
    estimator: str = "close_to_close",
    *,
    ohlcv_limit: int | None = None,
) -> DataEnvelope[VolatilityResult]

Calculate realized volatility for a token.

Fetches OHLCV candles via the configured providers and computes realized volatility using the specified estimator. Requires either an OHLCVRouter or an OHLCVModule for data, and a RealizedVolatilityCalculator for the computation.

Parameters:

Name Type Description Default
token str

Token symbol (e.g. "WETH", "ETH").

required
window_days int

Lookback window in calendar days. Default 30.

30
timeframe str

Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d). Default "1h".

'1h'
estimator str

"close_to_close" (default) or "parkinson".

'close_to_close'
ohlcv_limit int | None

Override for number of candles to fetch. If None, auto-calculated from window_days and timeframe.

None

Returns:

Type Description
DataEnvelope[VolatilityResult]

DataEnvelope[VolatilityResult] with INFORMATIONAL classification.

Raises:

Type Description
VolatilityUnavailableError

If volatility cannot be calculated.

ValueError

If no volatility calculator is configured.

Example

result = snapshot.realized_vol("WETH", window_days=30, timeframe="1h") print(f"Annualized vol: {result.value.annualized_vol:.2%}") print(f"Daily vol: {result.value.daily_vol:.2%}")

vol_cone

vol_cone(
    token: str,
    windows: list[int] | None = None,
    timeframe: str = "1h",
    estimator: str = "close_to_close",
    *,
    ohlcv_limit: int | None = None,
) -> DataEnvelope[VolConeResult]

Compute volatility cone: current vol vs historical percentile.

For each window, calculates the current realized vol and compares it to the historical distribution of rolling vols over the full candle history.

Parameters:

Name Type Description Default
token str

Token symbol (e.g. "WETH").

required
windows list[int] | None

Lookback windows in days. Default [7, 14, 30, 90].

None
timeframe str

Candle timeframe. Default "1h".

'1h'
estimator str

"close_to_close" or "parkinson".

'close_to_close'
ohlcv_limit int | None

Override for number of candles to fetch. If None, auto-calculated for the largest window.

None

Returns:

Type Description
DataEnvelope[VolConeResult]

DataEnvelope[VolConeResult] with INFORMATIONAL classification.

Raises:

Type Description
VolConeUnavailableError

If vol cone cannot be calculated.

ValueError

If no volatility calculator is configured.

Example

cone = snapshot.vol_cone("WETH", windows=[7, 14, 30, 90]) for entry in cone.value.entries: print(f"{entry.window_days}d: {entry.current_vol:.2%} (p{entry.percentile:.0f})")

portfolio_risk

portfolio_risk(
    pnl_series: list[float],
    total_value_usd: Decimal | None = None,
    return_interval: str = "1d",
    risk_free_rate: Decimal = Decimal("0"),
    var_method: str = "parametric",
    timestamps: list[datetime] | None = None,
    benchmark_eth_returns: list[float] | None = None,
    benchmark_btc_returns: list[float] | None = None,
) -> DataEnvelope[PortfolioRisk]

Calculate portfolio risk metrics from a PnL return series.

Computes Sharpe ratio, Sortino ratio, VaR, CVaR, and drawdown with explicit conventions for unambiguous results.

Parameters:

Name Type Description Default
pnl_series list[float]

List of periodic returns as fractions (0.01 = 1% gain).

required
total_value_usd Decimal | None

Current portfolio value in USD. Defaults to Decimal("0").

None
return_interval str

Periodicity of returns (1d, 1h, etc.).

'1d'
risk_free_rate Decimal

Risk-free rate per period as a decimal.

Decimal('0')
var_method str

VaR method: "parametric", "historical", or "cornish_fisher".

'parametric'
timestamps list[datetime] | None

Optional timestamps for each return.

None
benchmark_eth_returns list[float] | None

Optional ETH returns for beta calculation.

None
benchmark_btc_returns list[float] | None

Optional BTC returns for beta calculation.

None

Returns:

Type Description
DataEnvelope[PortfolioRisk]

DataEnvelope[PortfolioRisk] with INFORMATIONAL classification.

Raises:

Type Description
PortfolioRiskUnavailableError

If risk metrics cannot be calculated.

ValueError

If no risk calculator is configured.

Example

risk = snapshot.portfolio_risk(pnl_series, total_value_usd=Decimal("100000")) print(f"Sharpe: {risk.value.sharpe_ratio:.2f}") print(f"VaR 95%: ${risk.value.var_95}")

rolling_sharpe

rolling_sharpe(
    pnl_series: list[float],
    window_days: int = 30,
    return_interval: str = "1d",
    risk_free_rate: Decimal = Decimal("0"),
    timestamps: list[datetime] | None = None,
) -> DataEnvelope[RollingSharpeResult]

Compute rolling Sharpe ratio over a PnL series.

Parameters:

Name Type Description Default
pnl_series list[float]

List of periodic returns as fractions.

required
window_days int

Rolling window in days. Default 30.

30
return_interval str

Periodicity of returns (1d, 1h, etc.).

'1d'
risk_free_rate Decimal

Risk-free rate per period.

Decimal('0')
timestamps list[datetime] | None

Optional timestamps aligned with pnl_series.

None

Returns:

Type Description
DataEnvelope[RollingSharpeResult]

DataEnvelope[RollingSharpeResult] with INFORMATIONAL classification.

Raises:

Type Description
RollingSharpeUnavailableError

If rolling Sharpe cannot be computed.

ValueError

If no risk calculator is configured.

Example

result = snapshot.rolling_sharpe(pnl_series, window_days=30) for entry in result.value.entries: print(f"{entry.timestamp}: Sharpe={entry.sharpe:.2f}")

pool_analytics

pool_analytics(
    pool_address: str,
    chain: str | None = None,
    protocol: str | None = None,
) -> DataEnvelope[PoolAnalytics]

Get real-time analytics for a pool (TVL, volume, fee APR/APY).

Fetches from DeFi Llama (primary) or GeckoTerminal (fallback). Cache TTL: 5 minutes.

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str | None

Chain name. Defaults to strategy chain.

None
protocol str | None

Optional protocol hint (e.g. "uniswap_v3").

None

Returns:

Type Description
DataEnvelope[PoolAnalytics]

DataEnvelope[PoolAnalytics] with INFORMATIONAL classification.

Raises:

Type Description
PoolAnalyticsUnavailableError

If analytics cannot be retrieved.

ValueError

If no pool analytics reader is configured.

best_pool

best_pool(
    token_a: str,
    token_b: str,
    chain: str | None = None,
    metric: str = "fee_apr",
    protocols: list[str] | None = None,
) -> DataEnvelope[PoolAnalyticsResult]

Find the best pool for a token pair based on a metric.

Searches DeFi Llama for matching pools and ranks by metric.

Parameters:

Name Type Description Default
token_a str

First token symbol (e.g. "WETH").

required
token_b str

Second token symbol (e.g. "USDC").

required
chain str | None

Chain name. Defaults to strategy chain.

None
metric str

Sorting metric: "fee_apr", "fee_apy", "tvl_usd", "volume_24h_usd".

'fee_apr'
protocols list[str] | None

Optional list of protocols to filter by.

None

Returns:

Type Description
DataEnvelope[PoolAnalyticsResult]

DataEnvelope[PoolAnalyticsResult] with the best pool.

Raises:

Type Description
PoolAnalyticsUnavailableError

If no pools found or all providers fail.

ValueError

If no pool analytics reader is configured.

yield_opportunities

yield_opportunities(
    token: str,
    chains: list[str] | None = None,
    min_tvl: float = 100000,
    sort_by: str = "apy",
) -> DataEnvelope[list[YieldOpportunity]]

Find yield opportunities for a token across protocols and chains.

Searches DeFi Llama yields API for matching pools, sorted by the chosen metric. Cache TTL: 15 minutes.

Parameters:

Name Type Description Default
token str

Token symbol (e.g. "USDC", "WETH").

required
chains list[str] | None

Optional list of chains to filter. None means all.

None
min_tvl float

Minimum TVL in USD. Default $100k.

100000
sort_by str

Sort field: "apy", "tvl", "risk_score". Default "apy".

'apy'

Returns:

Type Description
DataEnvelope[list[YieldOpportunity]]

DataEnvelope[list[YieldOpportunity]] sorted by chosen metric.

Raises:

Type Description
YieldOpportunitiesUnavailableError

If data cannot be retrieved.

ValueError

If no yield aggregator is configured.

to_dict

to_dict() -> dict[str, Any]

Convert snapshot state to dictionary for serialization.

Returns:

Type Description
dict[str, Any]

Dictionary with snapshot metadata and cached values

Price Data

PriceOracle

almanak.framework.data.PriceOracle

Bases: Protocol

Protocol for price aggregation and oracle logic.

A PriceOracle wraps one or more BasePriceSource implementations and provides aggregation logic (median, weighted average, etc.) along with outlier detection and graceful degradation.

The oracle is the primary interface for strategies to get price data, abstracting away the complexity of managing multiple sources.

Key responsibilities: - Aggregate prices from multiple sources - Detect and filter outliers (>2% deviation from median) - Handle partial failures (some sources down) - Track source health metrics for routing decisions

Example implementation

class PriceAggregator: def init(self, sources: list[BasePriceSource]): self._sources = sources self._health_metrics: dict[str, SourceHealthMetrics] = {}

async def get_aggregated_price(
    self, token: str, quote: str = "USD"
) -> PriceResult:
    results = []
    errors = {}

    for source in self._sources:
        try:
            result = await source.get_price(token, quote)
            results.append(result)
        except Exception as e:
            errors[source.source_name] = str(e)

    if not results:
        raise AllDataSourcesFailed(errors)

    # Return median price with aggregated confidence
    median_price = self._calculate_median(results)
    confidence = self._calculate_confidence(results)
    return PriceResult(
        price=median_price,
        source="aggregated",
        timestamp=datetime.now(timezone.utc),
        confidence=confidence,
    )

get_aggregated_price async

get_aggregated_price(
    token: str, quote: str = "USD"
) -> PriceResult

Get aggregated price from multiple sources.

Parameters:

Name Type Description Default
token str

Token symbol to get price for

required
quote str

Quote currency (default "USD")

'USD'

Returns:

Type Description
PriceResult

PriceResult with aggregated price and confidence

Raises:

Type Description
AllDataSourcesFailed

If all sources fail to provide data

get_source_health

get_source_health(
    source_name: str,
) -> dict[str, Any] | None

Get health metrics for a specific source.

Returns metrics like success rate, average latency, last error time.

Parameters:

Name Type Description Default
source_name str

Name of the source to query

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with health metrics, or None if source unknown

AggregatedPrice

almanak.framework.data.AggregatedPrice dataclass

AggregatedPrice(
    price: Decimal,
    sources: list[PoolContribution] = list(),
    block_range: tuple[int, int] = (0, 0),
    method: str = "lwap",
    window_seconds: int = 0,
    pool_count: int = 0,
)

Aggregated price from multiple pools or time periods.

Attributes:

Name Type Description
price Decimal

The aggregated price value.

sources list[PoolContribution]

List of pool contributions with individual prices and weights.

block_range tuple[int, int]

Tuple of (min_block, max_block) covered by the aggregation.

method str

Aggregation method used ("twap" or "lwap").

window_seconds int

Time window in seconds (for TWAP).

pool_count int

Number of pools used in aggregation.

PriceAggregator

almanak.framework.data.PriceAggregator

PriceAggregator(
    pool_registry: PoolReaderRegistry,
    rpc_call: RpcCallFn,
    min_liquidity_usd: Decimal = DEFAULT_MIN_LIQUIDITY_USD,
    reference_price_usd: Decimal | None = None,
)

Aggregates prices across pools using TWAP and LWAP methods.

TWAP uses Uniswap V3's built-in oracle (observe()) for time-weighted average prices. LWAP reads live prices from multiple pools and weights them by in-range liquidity.

All results are EXECUTION_GRADE: fail-closed with no off-chain fallback.

Parameters:

Name Type Description Default
pool_registry PoolReaderRegistry

PoolReaderRegistry for reading pool prices.

required
rpc_call RpcCallFn

RPC call function for direct contract reads (observe()).

required
min_liquidity_usd Decimal

Minimum liquidity in USD to include a pool (default $10k).

DEFAULT_MIN_LIQUIDITY_USD
reference_price_usd Decimal | None

Reference price for converting liquidity to USD. If None, liquidity filtering uses raw liquidity values and the threshold is treated as raw liquidity units.

None

twap

twap(
    pool_address: str,
    chain: str,
    window_seconds: int = 300,
    token0_decimals: int = 18,
    token1_decimals: int = 6,
    protocol: str = "uniswap_v3",
) -> DataEnvelope[AggregatedPrice]

Calculate TWAP using Uniswap V3 oracle observe().

Calls observe([window_seconds, 0]) on the pool contract to get tick cumulatives, then derives the arithmetic mean tick and converts to a price.

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str

Chain name.

required
window_seconds int

Time window in seconds (default 300 = 5 min).

300
token0_decimals int

Decimals of token0 (default 18).

18
token1_decimals int

Decimals of token1 (default 6).

6
protocol str

Protocol name for source attribution.

'uniswap_v3'

Returns:

Type Description
DataEnvelope[AggregatedPrice]

DataEnvelope[AggregatedPrice] with TWAP price.

Raises:

Type Description
DataUnavailableError

If observe() call fails or returns invalid data.

lwap

lwap(
    token_a: str,
    token_b: str,
    chain: str,
    fee_tiers: list[int] | None = None,
    protocols: list[str] | None = None,
) -> DataEnvelope[AggregatedPrice]

Calculate liquidity-weighted average price across pools.

Reads live prices from all known pools for the given pair, filters out pools below the minimum liquidity threshold, then computes LWAP = sum(price_i * liquidity_i) / sum(liquidity_i).

Falls back to single-pool price if only one pool is available.

Parameters:

Name Type Description Default
token_a str

Token A symbol or address.

required
token_b str

Token B symbol or address.

required
chain str

Chain name.

required
fee_tiers list[int] | None

Fee tiers to search (default: [100, 500, 3000, 10000]).

None
protocols list[str] | None

Protocols to search (default: all registered for chain).

None

Returns:

Type Description
DataEnvelope[AggregatedPrice]

DataEnvelope[AggregatedPrice] with LWAP price.

Raises:

Type Description
DataUnavailableError

If no pools found for the pair (fail-closed).

Balance Data

BalanceProvider

almanak.framework.data.BalanceProvider

Bases: Protocol

Protocol for on-chain balance queries.

A BalanceProvider abstracts the complexity of querying ERC-20 and native token balances from the blockchain, handling decimal conversion, caching, and RPC error recovery.

Key responsibilities: - Query ERC-20 balances via balanceOf - Query native ETH balance via eth_getBalance - Handle token decimal conversion correctly - Cache balances to reduce RPC load - Invalidate cache after transaction execution

Example implementation

class Web3BalanceProvider: def init(self, web3: Web3, wallet_address: str): self._web3 = web3 self._wallet = wallet_address self._cache: dict[str, BalanceResult] = {} self._cache_ttl = 5 # 5 second cache

async def get_balance(self, token: str) -> BalanceResult:
    if token == "ETH":
        return await self._get_native_balance()

    token_info = self._get_token_info(token)
    contract = self._web3.eth.contract(
        address=token_info.address,
        abi=ERC20_ABI,
    )
    raw_balance = contract.functions.balanceOf(self._wallet).call()
    balance = Decimal(raw_balance) / Decimal(10 ** token_info.decimals)

    return BalanceResult(
        balance=balance,
        token=token,
        address=token_info.address,
        decimals=token_info.decimals,
        raw_balance=raw_balance,
    )

def invalidate_cache(self, token: Optional[str] = None) -> None:
    if token:
        self._cache.pop(token, None)
    else:
        self._cache.clear()

get_balance async

get_balance(token: str) -> BalanceResult

Get the balance of a token for the configured wallet.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "USDC") or "ETH" for native

required

Returns:

Type Description
BalanceResult

BalanceResult with balance in human-readable units

Raises:

Type Description
DataSourceError

If balance cannot be fetched

get_native_balance async

get_native_balance() -> BalanceResult

Get the native token balance (ETH, MATIC, etc.).

Convenience method for getting the chain's native token balance.

Returns:

Type Description
BalanceResult

BalanceResult for native token

invalidate_cache

invalidate_cache(token: str | None = None) -> None

Invalidate cached balances.

Should be called after transaction execution to ensure fresh data.

Parameters:

Name Type Description Default
token str | None

Specific token to invalidate, or None to clear all

None

OHLCV Data

OHLCVProvider

almanak.framework.data.OHLCVProvider

Bases: Protocol

Protocol for OHLCV (candlestick) data providers.

Used by indicators like RSI that need historical price data.

Implementations must: - Support multiple timeframes (1m, 5m, 15m, 1h, 4h, 1d) - Return properly typed OHLCVCandle objects - Handle caching to avoid repeated API calls - Gracefully degrade if full history unavailable

The supported_timeframes property should return the subset of VALID_TIMEFRAMES that this provider can supply data for.

supported_timeframes property

supported_timeframes: list[str]

Return the list of timeframes this provider supports.

Returns:

Type Description
list[str]

List of supported timeframe strings (e.g., ["1h", "4h", "1d"])

get_ohlcv async

get_ohlcv(
    token: str,
    quote: str = "USD",
    timeframe: str = "1h",
    limit: int = 100,
) -> list[OHLCVCandle]

Get OHLCV data for a token.

Parameters:

Name Type Description Default
token str

Token symbol

required
quote str

Quote currency

'USD'
timeframe str

Candle timeframe (must be in supported_timeframes)

'1h'
limit int

Number of candles to fetch

100

Returns:

Type Description
list[OHLCVCandle]

List of OHLCVCandle objects sorted by timestamp ascending

Raises:

Type Description
DataSourceError

If data cannot be fetched

InsufficientDataError

If requested limit exceeds available data

ValueError

If timeframe is not supported

OHLCVData

almanak.framework.data.OHLCVData dataclass

OHLCVData(
    timestamp: datetime,
    open: Decimal,
    high: Decimal,
    low: Decimal,
    close: Decimal,
    volume: Decimal | None = None,
)

OHLCV candlestick data point.

Attributes:

Name Type Description
timestamp datetime

Candle open time

open Decimal

Opening price

high Decimal

Highest price

low Decimal

Lowest price

close Decimal

Closing price

volume Decimal | None

Trading volume (optional, CoinGecko OHLC doesn't include volume)

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Pool Analytics

PoolAnalytics

almanak.framework.data.PoolAnalytics dataclass

PoolAnalytics(
    pool_address: str,
    chain: str,
    protocol: str,
    tvl_usd: Decimal,
    volume_24h_usd: Decimal,
    volume_7d_usd: Decimal,
    fee_apr: float,
    fee_apy: float,
    utilization_rate: float | None = None,
    token0_weight: float = 0.5,
    token1_weight: float = 0.5,
)

Analytics for a single pool.

Attributes:

Name Type Description
pool_address str

Pool contract address.

chain str

Chain name.

protocol str

Protocol name (e.g. "uniswap_v3").

tvl_usd Decimal

Total value locked in USD.

volume_24h_usd Decimal

24-hour trading volume in USD.

volume_7d_usd Decimal

7-day trading volume in USD.

fee_apr float

Annualized fee return as a percentage (e.g. 12.5 = 12.5%).

fee_apy float

Compounded annual fee return as a percentage.

utilization_rate float | None

Utilization rate for lending pools (0.0-1.0), None for DEX.

token0_weight float

Fraction of TVL in token0 (0.0-1.0).

token1_weight float

Fraction of TVL in token1 (0.0-1.0).

LiquidityDepth

almanak.framework.data.LiquidityDepth dataclass

LiquidityDepth(
    ticks: list[TickData],
    total_liquidity: int,
    current_tick: int,
    current_price: Decimal,
    pool_address: str = "",
    token0_decimals: int = 18,
    token1_decimals: int = 6,
    tick_spacing: int = 60,
)

Tick-level liquidity distribution for a pool.

Attributes:

Name Type Description
ticks list[TickData]

Initialized ticks sorted by tick_index (ascending).

total_liquidity int

Current in-range liquidity (L) from the pool.

current_tick int

Current active tick from slot0.

current_price Decimal

Current price from slot0.

pool_address str

Pool contract address.

token0_decimals int

Decimals of token0.

token1_decimals int

Decimals of token1.

tick_spacing int

Tick spacing for this pool.

Volatility and Risk

RealizedVolatilityCalculator

almanak.framework.data.RealizedVolatilityCalculator

Computes realized volatility from OHLCV candle data.

Supports two estimators: - close_to_close: Standard deviation of log returns. Simple and widely used, but only uses closing prices. - parkinson: High-low range estimator. More efficient (lower variance) for the same sample size because it uses intra-period range information.

All volatilities are annualized using sqrt(periods_per_year).

realized_vol

realized_vol(
    candles: list[OHLCVCandle],
    window_days: int = 30,
    timeframe: str = "1h",
    estimator: str = "close_to_close",
) -> VolatilityResult

Calculate realized volatility over a lookback window.

Parameters:

Name Type Description Default
candles list[OHLCVCandle]

OHLCV candles sorted ascending by timestamp. Should cover at least window_days of data at the given timeframe.

required
window_days int

Lookback window in calendar days.

30
timeframe str

Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d).

'1h'
estimator str

"close_to_close" (default) or "parkinson".

'close_to_close'

Returns:

Type Description
VolatilityResult

VolatilityResult with annualized, daily, and hourly vol.

Raises:

Type Description
InsufficientDataError

If fewer than 30 observations in the window.

ValueError

If timeframe is unsupported or estimator unknown.

vol_cone

vol_cone(
    candles: list[OHLCVCandle],
    windows: list[int] | None = None,
    timeframe: str = "1h",
    estimator: str = "close_to_close",
    token: str = "",
) -> VolConeResult

Compute volatility cone: current vol vs historical percentile.

For each window length, calculates the current realized vol and compares it to the distribution of rolling volatilities computed over the full candle history.

Parameters:

Name Type Description Default
candles list[OHLCVCandle]

Full OHLCV history sorted ascending. Should be significantly longer than the largest window for meaningful percentile estimation.

required
windows list[int] | None

Lookback windows in days. Default [7, 14, 30, 90].

None
timeframe str

Candle timeframe.

'1h'
estimator str

"close_to_close" or "parkinson".

'close_to_close'
token str

Token symbol for labeling.

''

Returns:

Type Description
VolConeResult

VolConeResult with one VolConeEntry per window.

Raises:

Type Description
InsufficientDataError

If not enough data for the smallest window.

ValueError

If timeframe is unsupported.

PortfolioRiskCalculator

almanak.framework.data.PortfolioRiskCalculator

Computes portfolio risk metrics from PnL or return series.

All calculations use explicit conventions (return interval, risk-free rate, annualization) to ensure results are unambiguous and comparable.

Supports three VaR methods: - parametric: Assumes normal distribution. VaR = -z * sigma * value. - historical: Percentile-based from empirical return distribution. - cornish_fisher: Adjusts z-score for skewness and kurtosis.

portfolio_risk

portfolio_risk(
    pnl_series: list[float],
    total_value_usd: Decimal,
    return_interval: str = "1d",
    risk_free_rate: Decimal = Decimal("0"),
    var_method: VaRMethod = VaRMethod.PARAMETRIC,
    timestamps: list[datetime] | None = None,
    benchmark_eth_returns: list[float] | None = None,
    benchmark_btc_returns: list[float] | None = None,
) -> PortfolioRisk

Calculate portfolio risk metrics from a PnL series.

The pnl_series should contain periodic returns as fractions (e.g. 0.01 = 1% gain, -0.02 = 2% loss).

Parameters:

Name Type Description Default
pnl_series list[float]

List of periodic returns (fractions, not percentages).

required
total_value_usd Decimal

Current portfolio value in USD.

required
return_interval str

Periodicity of the returns (1d, 1h, etc.).

'1d'
risk_free_rate Decimal

Risk-free rate per period as a decimal.

Decimal('0')
var_method VaRMethod

VaR calculation method.

PARAMETRIC
timestamps list[datetime] | None

Optional timestamps for each return (for conventions).

None
benchmark_eth_returns list[float] | None

Optional ETH returns for beta calculation.

None
benchmark_btc_returns list[float] | None

Optional BTC returns for beta calculation.

None

Returns:

Type Description
PortfolioRisk

PortfolioRisk with all metrics and explicit conventions.

Raises:

Type Description
InsufficientDataError

If fewer than 30 observations.

ValueError

If return_interval is unsupported.

rolling_sharpe

rolling_sharpe(
    pnl_series: list[float],
    window_days: int = 30,
    return_interval: str = "1d",
    risk_free_rate: Decimal = Decimal("0"),
    timestamps: list[datetime] | None = None,
) -> RollingSharpeResult

Compute rolling Sharpe ratio over the PnL series.

Parameters:

Name Type Description Default
pnl_series list[float]

List of periodic returns.

required
window_days int

Rolling window in days.

30
return_interval str

Periodicity of the returns.

'1d'
risk_free_rate Decimal

Risk-free rate per period.

Decimal('0')
timestamps list[datetime] | None

Optional timestamps aligned with pnl_series.

None

Returns:

Type Description
RollingSharpeResult

RollingSharpeResult with time series of Sharpe ratios.

Raises:

Type Description
InsufficientDataError

If fewer than 30 observations total.

ValueError

If return_interval is unsupported.

Yield and Rates

YieldAggregator

almanak.framework.data.YieldAggregator

YieldAggregator(
    cache_ttl: int = 900, request_timeout: float = 15.0
)

Cross-protocol yield comparison using DeFi Llama.

Fetches yield data from DeFi Llama yields API, filtering by token, chain, TVL, and protocol. Results are cached for 15 minutes.

Parameters:

Name Type Description Default
cache_ttl int

In-memory cache TTL in seconds. Default 900 (15 minutes).

900
request_timeout float

HTTP request timeout in seconds. Default 15.

15.0

get_yield_opportunities

get_yield_opportunities(
    token: str,
    chains: list[str] | None = None,
    min_tvl: float = 100000,
    sort_by: str = "apy",
) -> DataEnvelope[list[YieldOpportunity]]

Find yield opportunities for a token across protocols and chains.

Parameters:

Name Type Description Default
token str

Token symbol (e.g. "USDC", "WETH").

required
chains list[str] | None

Optional list of chains to filter (e.g. ["arbitrum", "base"]). None means all supported chains.

None
min_tvl float

Minimum TVL in USD. Default $100k.

100000
sort_by str

Sort field: "apy", "tvl", "risk_score". Default "apy".

'apy'

Returns:

Type Description
DataEnvelope[list[YieldOpportunity]]

DataEnvelope[list[YieldOpportunity]] sorted by the chosen metric.

Raises:

Type Description
DataSourceUnavailable

If DeFi Llama API is unavailable.

health

health() -> dict[str, int]

Return health metrics.

RateMonitor

almanak.framework.data.RateMonitor

RateMonitor(
    chain: str = "ethereum",
    cache_ttl_seconds: float = DEFAULT_CACHE_TTL_SECONDS,
    protocols: list[str] | None = None,
    rpc_url: str | None = None,
)

Unified lending rate monitor for multiple DeFi protocols.

This class provides a single interface for fetching lending rates from Aave V3, Morpho Blue, and Compound V3. It handles caching, error recovery, and cross-protocol rate comparison.

Attributes:

Name Type Description
chain str

Blockchain network

cache_ttl_seconds str

How long to cache rates (default 12s)

protocols list[str]

List of protocols to monitor

Example

monitor = RateMonitor(chain="ethereum")

Get specific rate

rate = await monitor.get_lending_rate("aave_v3", "USDC", RateSide.SUPPLY)

Get best rate

best = await monitor.get_best_lending_rate("USDC", RateSide.SUPPLY)

Get all rates for a protocol

rates = await monitor.get_protocol_rates("aave_v3")

Initialize the RateMonitor.

Parameters:

Name Type Description Default
chain str

Blockchain network (ethereum, arbitrum, etc.)

'ethereum'
cache_ttl_seconds float

Cache TTL in seconds (default 12s = ~1 block)

DEFAULT_CACHE_TTL_SECONDS
protocols list[str] | None

Protocols to monitor (default: all available on chain)

None
rpc_url str | None

RPC URL for on-chain queries (optional)

None

chain property

chain: str

Get the chain.

protocols property

protocols: list[str]

Get monitored protocols.

set_mock_rate

set_mock_rate(
    protocol: str,
    token: str,
    side: str,
    apy_percent: Decimal,
) -> None

Set a mock rate for testing.

Parameters:

Name Type Description Default
protocol str

Protocol identifier

required
token str

Token symbol

required
side str

supply or borrow

required
apy_percent Decimal

APY as percentage (e.g., 5.0 for 5%)

required

clear_mock_rates

clear_mock_rates() -> None

Clear all mock rates.

get_lending_rate async

get_lending_rate(
    protocol: str, token: str, side: RateSide
) -> LendingRate

Get lending rate for a specific protocol/token/side.

Parameters:

Name Type Description Default
protocol str

Protocol identifier (aave_v3, morpho_blue, compound_v3)

required
token str

Token symbol (USDC, WETH, etc.)

required
side RateSide

Rate side (SUPPLY or BORROW)

required

Returns:

Type Description
LendingRate

LendingRate with APY data

Raises:

Type Description
ProtocolNotSupportedError

If protocol not available on chain

TokenNotSupportedError

If token not supported

RateUnavailableError

If rate cannot be fetched

get_best_lending_rate async

get_best_lending_rate(
    token: str,
    side: RateSide,
    protocols: list[str] | None = None,
) -> BestRateResult

Get the best lending rate across protocols for a token.

For supply rates, returns the highest rate. For borrow rates, returns the lowest rate.

Parameters:

Name Type Description Default
token str

Token symbol

required
side RateSide

Rate side (SUPPLY or BORROW)

required
protocols list[str] | None

Protocols to compare (default: all available)

None

Returns:

Type Description
BestRateResult

BestRateResult with best rate and all rates

get_protocol_rates async

get_protocol_rates(
    protocol: str, tokens: list[str] | None = None
) -> ProtocolRates

Get all rates for a protocol.

Parameters:

Name Type Description Default
protocol str

Protocol identifier

required
tokens list[str] | None

Tokens to fetch (default: common tokens for chain)

None

Returns:

Type Description
ProtocolRates

ProtocolRates with all token rates

clear_cache

clear_cache() -> None

Clear all cached rates.

get_cache_stats

get_cache_stats() -> dict[str, Any]

Get cache statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with cache stats

FundingRateProvider

almanak.framework.data.FundingRateProvider

FundingRateProvider(
    venues: list[str] | None = None,
    cache_ttl_seconds: float = DEFAULT_CACHE_TTL_SECONDS,
    rpc_url: str | None = None,
    chain: str = "arbitrum",
)

Unified funding rate provider for multiple perpetual venues.

This class provides a single interface for fetching funding rates from GMX V2 and Hyperliquid. It handles caching, error recovery, and cross-venue rate comparison.

Attributes:

Name Type Description
venues list[str]

List of venues to monitor

cache_ttl_seconds list[str]

How long to cache rates (default 60s)

Example

provider = FundingRateProvider()

Get specific rate

rate = await provider.get_funding_rate(Venue.GMX_V2, "ETH-USD")

Get spread between venues

spread = await provider.get_funding_rate_spread("ETH-USD", Venue.GMX_V2, Venue.HYPERLIQUID)

Get historical rates

history = await provider.get_historical_funding_rates(Venue.GMX_V2, "ETH-USD", hours=24)

Initialize the FundingRateProvider.

Parameters:

Name Type Description Default
venues list[str] | None

Venues to monitor (default: all supported)

None
cache_ttl_seconds float

Cache TTL in seconds (default 60s)

DEFAULT_CACHE_TTL_SECONDS
rpc_url str | None

RPC URL for on-chain queries (GMX V2). If not provided, will use default rates as fallback.

None
chain str

Chain for GMX V2 queries (default: arbitrum)

'arbitrum'

venues property

venues: list[str]

Get monitored venues.

close async

close() -> None

Close HTTP session and release resources.

set_mock_rate

set_mock_rate(
    venue: str, market: str, rate_hourly: Decimal
) -> None

Set a mock funding rate for testing.

Parameters:

Name Type Description Default
venue str

Venue identifier

required
market str

Market symbol

required
rate_hourly Decimal

Hourly funding rate

required

clear_mock_rates

clear_mock_rates() -> None

Clear all mock rates.

get_funding_rate async

get_funding_rate(venue: Venue, market: str) -> FundingRate

Get the current funding rate for a venue/market.

Parameters:

Name Type Description Default
venue Venue

Venue identifier (gmx_v2, hyperliquid)

required
market str

Market symbol (e.g., ETH-USD, BTC-USD)

required

Returns:

Type Description
FundingRate

FundingRate with current rate data

Raises:

Type Description
VenueNotSupportedError

If venue not supported

MarketNotSupportedError

If market not supported

FundingRateUnavailableError

If rate cannot be fetched

get_funding_rate_spread async

get_funding_rate_spread(
    market: str, venue_a: Venue, venue_b: Venue
) -> FundingRateSpread

Get the funding rate spread between two venues.

The spread represents the difference in funding rates, which creates arbitrage opportunities. A positive spread means venue_a has higher funding than venue_b.

Parameters:

Name Type Description Default
market str

Market symbol (e.g., ETH-USD)

required
venue_a Venue

First venue

required
venue_b Venue

Second venue

required

Returns:

Type Description
FundingRateSpread

FundingRateSpread with comparison data

Raises:

Type Description
FundingRateUnavailableError

If either rate cannot be fetched

get_historical_funding_rates async

get_historical_funding_rates(
    venue: Venue, market: str, hours: int = 24
) -> HistoricalFundingData

Get historical funding rates for a venue/market.

Returns historical funding rate data for analysis and trend detection.

Parameters:

Name Type Description Default
venue Venue

Venue identifier

required
market str

Market symbol

required
hours int

Number of hours of history to fetch (default 24, max 168)

24

Returns:

Type Description
HistoricalFundingData

HistoricalFundingData with rate history

Raises:

Type Description
VenueNotSupportedError

If venue not supported

MarketNotSupportedError

If market not supported

FundingRateUnavailableError

If data cannot be fetched

get_all_funding_rates async

get_all_funding_rates(
    venue: Venue,
) -> dict[str, FundingRate]

Get funding rates for all supported markets on a venue.

Parameters:

Name Type Description Default
venue Venue

Venue identifier

required

Returns:

Type Description
dict[str, FundingRate]

Dictionary mapping market symbol to FundingRate

clear_cache

clear_cache() -> None

Clear all cached rates.

get_cache_stats

get_cache_stats() -> dict[str, Any]

Get cache statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with cache stats

Impermanent Loss

ILCalculator

almanak.framework.data.ILCalculator

ILCalculator(
    positions: dict[str, LPPosition] | None = None,
    mock_prices: dict[str, Decimal] | None = None,
)

Calculator for impermanent loss across various AMM pool types.

This class provides methods to calculate impermanent loss for liquidity positions, including: - Standard constant product AMM pools (Uniswap V2, SushiSwap) - Concentrated liquidity pools (Uniswap V3) - Weighted pools (Balancer) - Stable pools (Curve)

The calculator also supports projecting IL for simulated price changes and tracking IL exposure for active positions.

Example

calc = ILCalculator()

Calculate IL for a 50/50 pool

result = calc.calculate_il( entry_price_a=Decimal("2000"), entry_price_b=Decimal("1"), current_price_a=Decimal("2500"), current_price_b=Decimal("1"), ) print(f"IL: {result.il_percent:.4f}%")

Project IL for various price changes

for pct in [10, 25, 50, 100]: proj = calc.project_il(price_change_pct=Decimal(pct)) print(f"Price +{pct}%: IL = {proj.il_percent:.4f}%")

Initialize the IL calculator.

Parameters:

Name Type Description Default
positions dict[str, LPPosition] | None

Optional dictionary of tracked LP positions

None
mock_prices dict[str, Decimal] | None

Optional mock prices for testing (token -> price)

None

calculate_il

calculate_il(
    entry_price_a: Decimal,
    entry_price_b: Decimal,
    current_price_a: Decimal,
    current_price_b: Decimal,
    weight_a: Decimal = Decimal("0.5"),
    weight_b: Decimal = Decimal("0.5"),
    pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
    entry_value: Decimal | None = None,
) -> ILResult

Calculate impermanent loss given entry and current prices.

For constant product AMMs (Uniswap V2 style), IL is calculated as: IL = 2 * sqrt(price_ratio) / (1 + price_ratio) - 1

For weighted pools (Balancer style), IL is calculated using: IL = (price_ratio ^ weight_a) / ((weight_a * price_ratio) + weight_b) - 1

Parameters:

Name Type Description Default
entry_price_a Decimal

Entry price of token A (in quote currency, e.g., USD)

required
entry_price_b Decimal

Entry price of token B (in quote currency)

required
current_price_a Decimal

Current price of token A

required
current_price_b Decimal

Current price of token B

required
weight_a Decimal

Weight of token A (default 0.5 for equal weight)

Decimal('0.5')
weight_b Decimal

Weight of token B (default 0.5 for equal weight)

Decimal('0.5')
pool_type PoolType

Type of AMM pool

CONSTANT_PRODUCT
entry_value Decimal | None

Optional initial position value (for absolute loss calc)

None

Returns:

Type Description
ILResult

ILResult with IL metrics

Raises:

Type Description
InvalidPriceError

If any price is zero or negative

InvalidWeightError

If weights don't sum to 1.0

calculate_il_concentrated

calculate_il_concentrated(
    entry_price_a: Decimal,
    entry_price_b: Decimal,
    current_price_a: Decimal,
    current_price_b: Decimal,
    tick_lower: int,
    tick_upper: int,
    entry_value: Decimal | None = None,
) -> ILResult

Calculate IL for concentrated liquidity positions (Uniswap V3).

Concentrated liquidity amplifies both gains and IL compared to full-range positions. The IL is higher when price moves outside the range.

Parameters:

Name Type Description Default
entry_price_a Decimal

Entry price of token A

required
entry_price_b Decimal

Entry price of token B

required
current_price_a Decimal

Current price of token A

required
current_price_b Decimal

Current price of token B

required
tick_lower int

Lower tick bound of the position

required
tick_upper int

Upper tick bound of the position

required
entry_value Decimal | None

Optional initial position value

None

Returns:

Type Description
ILResult

ILResult with IL metrics for concentrated position

Raises:

Type Description
InvalidPriceError

If any price is zero or negative

project_il

project_il(
    price_change_pct: Decimal,
    weight_a: Decimal = Decimal("0.5"),
    weight_b: Decimal = Decimal("0.5"),
    pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
) -> ProjectedILResult

Project impermanent loss for a given price change.

This method simulates what IL would be if token A's price changed by the specified percentage while token B remains constant (typical for ETH/stablecoin pairs).

Parameters:

Name Type Description Default
price_change_pct Decimal

Price change percentage (e.g., 50 for +50%, -30 for -30%)

required
weight_a Decimal

Weight of token A (default 0.5)

Decimal('0.5')
weight_b Decimal

Weight of token B (default 0.5)

Decimal('0.5')
pool_type PoolType

Type of AMM pool

CONSTANT_PRODUCT

Returns:

Type Description
ProjectedILResult

ProjectedILResult with projected IL metrics

Raises:

Type Description
InvalidWeightError

If weights don't sum to 1.0

InvalidPriceError

If price change would result in negative price

add_position

add_position(position: LPPosition) -> None

Add an LP position for tracking.

Parameters:

Name Type Description Default
position LPPosition

The LP position to track

required

remove_position

remove_position(position_id: str) -> None

Remove an LP position from tracking.

Parameters:

Name Type Description Default
position_id str

ID of the position to remove

required

Raises:

Type Description
PositionNotFoundError

If position doesn't exist

get_position

get_position(position_id: str) -> LPPosition

Get a tracked LP position.

Parameters:

Name Type Description Default
position_id str

ID of the position

required

Returns:

Type Description
LPPosition

The LP position

Raises:

Type Description
PositionNotFoundError

If position doesn't exist

get_all_positions

get_all_positions() -> list[LPPosition]

Get all tracked LP positions.

Returns:

Type Description
list[LPPosition]

List of all tracked positions

calculate_il_exposure

calculate_il_exposure(
    position_id: str,
    current_price_a: Decimal | None = None,
    current_price_b: Decimal | None = None,
    fees_earned: Decimal = Decimal("0"),
) -> ILExposure

Calculate IL exposure for a tracked position.

Parameters:

Name Type Description Default
position_id str

ID of the tracked position

required
current_price_a Decimal | None

Current price of token A (uses mock if not provided)

None
current_price_b Decimal | None

Current price of token B (uses mock if not provided)

None
fees_earned Decimal

Fees earned by the position (optional)

Decimal('0')

Returns:

Type Description
ILExposure

ILExposure with full exposure details

Raises:

Type Description
PositionNotFoundError

If position doesn't exist

ILExposureUnavailableError

If prices cannot be determined

set_mock_prices

set_mock_prices(prices: dict[str, Decimal]) -> None

Set mock prices for testing.

Parameters:

Name Type Description Default
prices dict[str, Decimal]

Dictionary mapping token symbols to prices

required

clear_mock_prices

clear_mock_prices() -> None

Clear mock prices.

Data Routing

DataRouter

almanak.framework.data.DataRouter dataclass

DataRouter(config: DataRoutingConfig = DataRoutingConfig())

Routes data requests to providers with fallback and circuit-breaking.

For EXECUTION_GRADE data types, the router fails closed after the primary provider fails -- no fallback to degraded sources is attempted.

For INFORMATIONAL data types, the router tries the fallback chain in order, each with its own timeout, and returns the best available result with degraded confidence.

Attributes:

Name Type Description
config DataRoutingConfig

Routing configuration specifying provider assignments.

register_provider

register_provider(provider: DataProvider) -> None

Register a data provider for routing.

Parameters:

Name Type Description Default
provider DataProvider

A DataProvider implementation to register.

required

route

route(
    data_type: str,
    *,
    instrument: str = "",
    strategy_config: dict | None = None,
    **fetch_kwargs: object,
) -> DataEnvelope

Select a provider and fetch data with fallback logic.

Provider selection order
  1. Strategy-level override (if strategy_config provided)
  2. Global routing config
  3. Built-in defaults

Parameters:

Name Type Description Default
data_type str

Data type key (e.g. "ohlcv", "pool_price").

required
instrument str

Instrument identifier for logging/error context.

''
strategy_config dict | None

Optional strategy config dict with data_overrides.

None
**fetch_kwargs object

Additional kwargs passed through to provider.fetch().

{}

Returns:

Type Description
DataEnvelope

DataEnvelope from the selected provider.

Raises:

Type Description
DataUnavailableError

When all providers fail.

get_metrics

get_metrics(
    provider_name: str | None = None,
) -> dict[str, object]

Return metrics for a specific provider or all providers.

Parameters:

Name Type Description Default
provider_name str | None

If provided, return metrics for this provider only.

None

Returns:

Type Description
dict[str, object]

Dict of metric data.

health

health() -> dict[str, object]

Return health status for all registered providers.

Returns:

Type Description
dict[str, object]

Dict mapping provider name -> circuit breaker health dict.

CircuitBreaker

almanak.framework.data.CircuitBreaker

CircuitBreaker(
    name: str,
    failure_threshold: int = 5,
    cooldown_seconds: float = 60.0,
)

Thread-safe circuit breaker for a single data provider.

Opens after failure_threshold consecutive failures, then enters a cooldown period. After cooldown, allows one test request (HALF_OPEN). If the test succeeds the circuit closes; if it fails the circuit reopens.

Attributes:

Name Type Description
name

Provider name this breaker guards.

failure_threshold

Consecutive failures before opening.

cooldown_seconds

Seconds to wait before half-open test.

state property

state: CircuitState

Current circuit state, accounting for cooldown expiry.

failure_count property

failure_count: int

Number of consecutive failures.

last_failure_time property

last_failure_time: float | None

Monotonic timestamp of the last recorded failure, or None.

allow_request

allow_request() -> bool

Check whether a request should be allowed through.

Returns:

Type Description
bool

True if the circuit is CLOSED or transitioning to HALF_OPEN.

bool

False if the circuit is OPEN (still within cooldown).

record_success

record_success() -> None

Record a successful request, resetting the breaker to CLOSED.

record_failure

record_failure() -> None

Record a failed request, potentially opening the circuit.

reset

reset() -> None

Manually reset the breaker to CLOSED state.

health

health() -> dict[str, object]

Return health metrics for monitoring.

Returns:

Type Description
dict[str, object]

Dict with state, failure_count, last_failure_time, and config.

Exceptions

almanak.framework.data.DataUnavailableError

DataUnavailableError(
    data_type: str, instrument: str, reason: str
)

Bases: DataSourceError

Raised when required data cannot be obtained from any provider.

Used by the DataRouter when all providers (primary + fallbacks) have failed or been circuit-broken for a given request.

Attributes:

Name Type Description
data_type

What kind of data was requested (e.g. 'pool_price', 'ohlcv').

instrument

Instrument identifier (symbol or address).

reason

Human-readable explanation.

almanak.framework.data.MarketSnapshotError

Bases: Exception

Base exception for MarketSnapshot errors.