Data Layer¶
Market data providers, price oracles, balance providers, and the unified MarketSnapshot interface.
MarketSnapshot¶
The primary data interface passed to decide(). Provides lazy access to prices, balances, indicators, and more.
almanak.framework.data.MarketSnapshot
¶
MarketSnapshot(
chain: str,
wallet_address: str,
price_oracle: PriceOracle | None = None,
balance_provider: BalanceProvider | None = None,
rsi_calculator: RSICalculator | None = None,
ohlcv_module: Optional[OHLCVModule] = None,
gas_oracle: Optional[GasOracle] = None,
pool_reader: Optional[UniswapV3PoolReader] = None,
rate_monitor: Optional[RateMonitor] = None,
funding_rate_provider: Optional[
FundingRateProvider
] = None,
multi_dex_service: Optional[
MultiDexPriceService
] = None,
il_calculator: Optional[ILCalculator] = None,
prediction_provider: Optional[
PredictionMarketDataProvider
] = None,
stablecoin_config: StablecoinConfig | None = None,
freshness_config: FreshnessConfig | None = None,
timestamp: datetime | None = None,
pool_reader_registry: Optional[
PoolReaderRegistry
] = None,
price_aggregator: Optional[PriceAggregator] = None,
data_router: Optional[DataRouter] = None,
ohlcv_router: Optional[OHLCVRouter] = None,
pool_history_reader: Optional[PoolHistoryReader] = None,
rate_history_reader: Optional[RateHistoryReader] = None,
liquidity_depth_reader: Optional[
LiquidityDepthReader
] = None,
slippage_estimator: Optional[SlippageEstimator] = None,
volatility_calculator: Optional[
RealizedVolatilityCalculator
] = None,
risk_calculator: Optional[
PortfolioRiskCalculator
] = None,
pool_analytics_reader: Optional[
PoolAnalyticsReader
] = None,
yield_aggregator: Optional[YieldAggregator] = None,
wallet_activity_provider: Optional[
WalletActivityProvider
] = None,
)
Unified market data interface for strategy decision-making.
MarketSnapshot provides a clean, synchronous interface for strategies to access market data. It wraps underlying async data providers and handles event loop management internally.
All methods raise clear exceptions on failure - there are no silent defaults or fallback values. Strategies should handle exceptions appropriately.
Attributes:
| Name | Type | Description |
|---|---|---|
chain |
str
|
Blockchain network (e.g., "arbitrum", "ethereum") |
wallet_address |
str
|
Address of the wallet for balance queries |
timestamp |
datetime
|
When the snapshot was created |
Example
Create snapshot with all providers¶
snapshot = MarketSnapshot( chain="arbitrum", wallet_address="0x...", price_oracle=price_aggregator, balance_provider=web3_balance_provider, rsi_calculator=rsi_calculator, )
Get aggregated price¶
eth_price = snapshot.price("WETH") # Decimal("2500.50")
Get wallet balance¶
usdc_balance = snapshot.balance("USDC") # Decimal("1000.00")
Get RSI indicator¶
rsi = snapshot.rsi("WETH", period=14) # 45.5
Get balance in USD terms¶
usdc_value = snapshot.balance_usd("USDC") # Decimal("1000.00")
Get total portfolio value¶
total = snapshot.total_portfolio_usd(["WETH", "USDC", "ARB"])
Initialize the MarketSnapshot.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str
|
Blockchain network name (e.g., "arbitrum", "ethereum") |
required |
wallet_address
|
str
|
Wallet address for balance queries |
required |
price_oracle
|
PriceOracle | None
|
PriceOracle implementation for price data |
None
|
balance_provider
|
BalanceProvider | None
|
BalanceProvider implementation for balance data |
None
|
rsi_calculator
|
RSICalculator | None
|
RSICalculator implementation for RSI indicator |
None
|
ohlcv_module
|
Optional[OHLCVModule]
|
OHLCVModule for historical candlestick data |
None
|
gas_oracle
|
Optional[GasOracle]
|
GasOracle implementation for gas price data |
None
|
pool_reader
|
Optional[UniswapV3PoolReader]
|
UniswapV3PoolReader for DEX pool data |
None
|
rate_monitor
|
Optional[RateMonitor]
|
RateMonitor for lending protocol rates |
None
|
funding_rate_provider
|
Optional[FundingRateProvider]
|
FundingRateProvider for perpetual funding rates |
None
|
multi_dex_service
|
Optional[MultiDexPriceService]
|
MultiDexPriceService for cross-DEX price comparison |
None
|
il_calculator
|
Optional[ILCalculator]
|
ILCalculator for impermanent loss calculations |
None
|
prediction_provider
|
Optional[PredictionMarketDataProvider]
|
PredictionMarketDataProvider for prediction market data |
None
|
stablecoin_config
|
StablecoinConfig | None
|
Configuration for stablecoin pricing behavior. Default is StablecoinConfig(mode='market') which uses actual market prices. |
None
|
freshness_config
|
FreshnessConfig | None
|
Configuration for data freshness thresholds. Default is FreshnessConfig(price_warn_sec=30, price_error_sec=300). |
None
|
timestamp
|
datetime | None
|
Optional snapshot timestamp (defaults to now) |
None
|
pool_reader_registry
|
Optional[PoolReaderRegistry]
|
PoolReaderRegistry for on-chain pool price reads |
None
|
price_aggregator
|
Optional[PriceAggregator]
|
PriceAggregator for TWAP/LWAP aggregation |
None
|
data_router
|
Optional[DataRouter]
|
DataRouter for provider selection and failover |
None
|
ohlcv_router
|
Optional[OHLCVRouter]
|
OHLCVRouter for multi-provider OHLCV with CEX/DEX awareness |
None
|
pool_history_reader
|
Optional[PoolHistoryReader]
|
PoolHistoryReader for historical pool state data |
None
|
rate_history_reader
|
Optional[RateHistoryReader]
|
RateHistoryReader for historical lending/funding rate data |
None
|
liquidity_depth_reader
|
Optional[LiquidityDepthReader]
|
LiquidityDepthReader for tick-level liquidity reads |
None
|
slippage_estimator
|
Optional[SlippageEstimator]
|
SlippageEstimator for swap slippage estimation |
None
|
volatility_calculator
|
Optional[RealizedVolatilityCalculator]
|
RealizedVolatilityCalculator for vol metrics |
None
|
risk_calculator
|
Optional[PortfolioRiskCalculator]
|
PortfolioRiskCalculator for portfolio risk metrics |
None
|
pool_analytics_reader
|
Optional[PoolAnalyticsReader]
|
PoolAnalyticsReader for pool TVL, volume, fee APR |
None
|
yield_aggregator
|
Optional[YieldAggregator]
|
YieldAggregator for cross-protocol yield comparison |
None
|
wallet_activity
¶
wallet_activity(
leader_address: str | None = None,
action_types: list[str] | None = None,
min_usd_value: Decimal | None = None,
protocols: list[str] | None = None,
) -> list
Get leader wallet activity signals for copy trading.
Returns filtered signals from the WalletActivityProvider. If no provider is configured, returns an empty list (graceful degradation).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
leader_address
|
str | None
|
Filter by specific leader wallet address |
None
|
action_types
|
list[str] | None
|
Filter by action types (e.g., ["SWAP"]) |
None
|
min_usd_value
|
Decimal | None
|
Minimum USD value filter |
None
|
protocols
|
list[str] | None
|
Filter by protocol names (e.g., ["uniswap_v3"]) |
None
|
Returns:
| Type | Description |
|---|---|
list
|
List of CopySignal objects matching the filters |
price
¶
Get the aggregated price for a token.
Fetches the price from the configured PriceOracle, which may aggregate prices from multiple sources.
For stablecoins, the behavior depends on the configured StablecoinConfig: - mode='market' (default): Returns actual market price - mode='pegged': Returns Decimal('1.00') for configured stablecoins - mode='hybrid': Returns $1.00 if within tolerance, else market price
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH", "USDC") |
required |
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
Returns:
| Type | Description |
|---|---|
Decimal
|
Price as a Decimal for precision |
Raises:
| Type | Description |
|---|---|
PriceUnavailableError
|
If price cannot be determined |
ValueError
|
If no price oracle is configured |
balance
¶
Get the wallet balance for a token.
Queries the balance from the configured BalanceProvider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "USDC") or "ETH" for native |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Balance as a Decimal in human-readable units (not wei) |
Raises:
| Type | Description |
|---|---|
BalanceUnavailableError
|
If balance cannot be determined |
ValueError
|
If no balance provider is configured |
rsi
¶
Get the RSI (Relative Strength Index) for a token.
Calculates RSI using the configured RSI calculator with historical price data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
period
|
int
|
RSI calculation period (default 14) |
14
|
timeframe
|
str
|
OHLCV candle timeframe (default "4h") Supported: "1m", "5m", "15m", "1h", "4h", "1d" Note: 1m/5m/15m may return 30-min candles (CoinGecko limitation) |
'4h'
|
Returns:
| Type | Description |
|---|---|
float
|
RSI value from 0 to 100 as a float |
Raises:
| Type | Description |
|---|---|
RSIUnavailableError
|
If RSI cannot be calculated |
ValueError
|
If no RSI calculator is configured |
Example
Default 4-hour candles¶
rsi = snapshot.rsi("WETH", period=14)
1-hour candles for shorter-term analysis¶
rsi_1h = snapshot.rsi("WETH", period=14, timeframe="1h")
Daily candles for longer-term analysis¶
rsi_1d = snapshot.rsi("WETH", period=14, timeframe="1d")
Multi-timeframe analysis¶
if snapshot.rsi("WETH", timeframe="1h") < 30 and snapshot.rsi("WETH", timeframe="1d") < 50: # Short-term oversold, long-term not overbought return SwapIntent(...)
sma
¶
Get the Simple Moving Average (SMA) for a token.
SMA is the unweighted mean of the last N closing prices.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
period
|
int
|
Number of periods for the average (default 20) |
20
|
timeframe
|
str
|
OHLCV candle timeframe (default "1h") |
'1h'
|
Returns:
| Type | Description |
|---|---|
float
|
SMA value as float |
Example
sma_20 = snapshot.sma("WETH", period=20, timeframe="1h") sma_200 = snapshot.sma("WETH", period=200, timeframe="1d")
Trading logic¶
if current_price > sma_20: print("Price above 20-period SMA - bullish trend")
ema
¶
Get the Exponential Moving Average (EMA) for a token.
EMA gives more weight to recent prices using exponential decay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
period
|
int
|
Number of periods (default 12) |
12
|
timeframe
|
str
|
OHLCV candle timeframe (default "1h") |
'1h'
|
smoothing
|
float
|
Smoothing factor (default 2.0) |
2.0
|
Returns:
| Type | Description |
|---|---|
float
|
EMA value as float |
Example
ema_12 = snapshot.ema("WETH", period=12, timeframe="1h") ema_26 = snapshot.ema("WETH", period=26, timeframe="1h")
Golden cross check¶
if ema_12 > ema_26: print("Golden cross - bullish signal")
bollinger_bands
¶
bollinger_bands(
token: str,
period: int = 20,
std_dev: float = 2.0,
timeframe: str = "1h",
) -> BollingerBandsResult
Get Bollinger Bands for a token.
Bollinger Bands consist of a middle band (SMA) with upper and lower bands at a specified number of standard deviations away.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
period
|
int
|
SMA period (default 20) |
20
|
std_dev
|
float
|
Standard deviation multiplier (default 2.0) |
2.0
|
timeframe
|
str
|
OHLCV candle timeframe (default "1h") |
'1h'
|
Returns:
| Type | Description |
|---|---|
BollingerBandsResult
|
BollingerBandsResult with: - upper_band: Upper band value - middle_band: Middle band (SMA) value - lower_band: Lower band value - bandwidth: Band width as percentage - percent_b: Price position (0=lower, 1=upper) |
Example
bb = snapshot.bollinger_bands("WETH", period=20, std_dev=2.0, timeframe="1h")
if bb.percent_b < 0: print("Price below lower band - oversold!") elif bb.percent_b > 1: print("Price above upper band - overbought!")
if bb.bandwidth < 0.05: print("Low volatility - squeeze detected")
macd
¶
macd(
token: str,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9,
timeframe: str = "1h",
) -> MACDResult
Get MACD (Moving Average Convergence Divergence) for a token.
MACD is a trend-following momentum indicator showing the relationship between two exponential moving averages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
fast_period
|
int
|
Fast EMA period (default 12) |
12
|
slow_period
|
int
|
Slow EMA period (default 26) |
26
|
signal_period
|
int
|
Signal line EMA period (default 9) |
9
|
timeframe
|
str
|
OHLCV candle timeframe (default "1h") |
'1h'
|
Returns:
| Type | Description |
|---|---|
MACDResult
|
MACDResult with: - macd_line: MACD line (fast EMA - slow EMA) - signal_line: Signal line (EMA of MACD line) - histogram: MACD histogram (macd_line - signal_line) |
Example
macd = snapshot.macd("WETH", timeframe="4h")
if macd.histogram > 0: print("MACD above signal - bullish momentum") elif macd.histogram < 0: print("MACD below signal - bearish momentum")
stochastic
¶
stochastic(
token: str,
k_period: int = 14,
d_period: int = 3,
timeframe: str = "1h",
) -> StochasticResult
Get Stochastic Oscillator for a token.
The Stochastic Oscillator is a momentum indicator comparing a token's closing price to its price range over a given period.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
k_period
|
int
|
Lookback period for %K (default 14) |
14
|
d_period
|
int
|
SMA period for %D (default 3) |
3
|
timeframe
|
str
|
OHLCV candle timeframe (default "1h") |
'1h'
|
Returns:
| Type | Description |
|---|---|
StochasticResult
|
StochasticResult with: - k_value: %K (fast stochastic, 0-100 scale) - d_value: %D (slow stochastic, 0-100 scale) |
Example
stoch = snapshot.stochastic("WETH", k_period=14, d_period=3)
if stoch.k_value < 20: print("Oversold territory") elif stoch.k_value > 80: print("Overbought territory")
Crossover signals¶
if stoch.k_value > stoch.d_value: print("Bullish - %K crossed above %D")
atr
¶
Get Average True Range (ATR) for a token.
ATR is a volatility indicator showing how much an asset moves on average.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "ETH") |
required |
period
|
int
|
ATR period (default 14) |
14
|
timeframe
|
str
|
OHLCV candle timeframe (default "1h") |
'1h'
|
Returns:
| Type | Description |
|---|---|
float
|
ATR value (in the same units as the token price) |
Example
atr = snapshot.atr("WETH", period=14, timeframe="4h") current_price = float(snapshot.price("WETH"))
Stop-loss placement¶
stop_loss = current_price - (2 * atr) print(f"Stop loss at ${stop_loss:.2f} (2 ATR below)")
Position sizing with 1% risk¶
risk_amount = 10000 * 0.01 # $100 position_size = risk_amount / atr print(f"Position size: {position_size:.4f} units")
ohlcv
¶
ohlcv(
token: str | Instrument,
timeframe: str = "1h",
limit: int = 100,
quote: str = "USD",
gap_strategy: GapStrategy = "nan",
*,
pool_address: str | None = None,
) -> pd.DataFrame
Get OHLCV (candlestick) data for a token.
Fetches historical candlestick data from the configured OHLCV providers. When an OHLCVRouter is configured, automatically classifies instruments as CEX-primary or DeFi-primary and routes to the appropriate provider.
Accepts plain token symbols (e.g. "WETH"), pair strings (e.g. "WETH/USDC"), or Instrument objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str | Instrument
|
Token symbol, "BASE/QUOTE" string, or Instrument. |
required |
timeframe
|
str
|
Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d). Default "1h". |
'1h'
|
limit
|
int
|
Maximum number of candles to return. Default 100. |
100
|
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
gap_strategy
|
GapStrategy
|
How to handle gaps in data: - 'nan': Fill gaps with NaN values (default) - 'ffill': Forward-fill gaps with last known values - 'drop': Remove gaps (returns only continuous data) |
'nan'
|
pool_address
|
str | None
|
Explicit pool address for DEX providers (optional). |
None
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
pandas DataFrame with columns: - timestamp: datetime - open: float64 - high: float64 - low: float64 - close: float64 - volume: float64 (may contain NaN if unavailable) |
DataFrame
|
DataFrame.attrs includes metadata: - base: Token symbol - quote: Quote currency - timeframe: Candle timeframe - source: Provider source name - chain: Chain identifier - fetched_at: When the data was fetched - confidence: Data confidence (0.0-1.0) |
DataFrame
|
Returns empty DataFrame with correct schema if no data available. |
Raises:
| Type | Description |
|---|---|
OHLCVUnavailableError
|
If OHLCV data cannot be retrieved |
ValueError
|
If no OHLCV module/router is configured or invalid timeframe |
Example
Get 1-hour candles for WETH¶
df = snapshot.ohlcv("WETH", timeframe="1h", limit=100) print(df.columns) # timestamp, open, high, low, close, volume
Use Instrument for explicit routing¶
from almanak.framework.data.models import Instrument inst = Instrument(base="WETH", quote="USDC", chain="arbitrum") df = snapshot.ohlcv(inst, timeframe="1h")
Use with pandas-ta for indicators¶
import pandas_ta as ta df['rsi'] = ta.rsi(df['close'], length=14) df['macd'] = ta.macd(df['close'])['MACD_12_26_9']
Handle gaps with forward-fill¶
df = snapshot.ohlcv("WETH", gap_strategy="ffill")
gas_price
¶
Get current gas price for a chain.
Fetches the current gas price data from the configured GasOracle. Results are cached for 12 seconds (approximately 1 block) to avoid excessive RPC calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str | None
|
Chain identifier (e.g., "ethereum", "arbitrum", "optimism"). If not specified, uses the strategy's primary chain from the MarketSnapshot. |
None
|
Returns:
| Type | Description |
|---|---|
GasPrice
|
GasPrice dataclass with: - chain: Chain identifier - base_fee_gwei: Network base fee in gwei - priority_fee_gwei: Priority/tip fee in gwei - max_fee_gwei: Maximum fee (base + priority) in gwei - l1_base_fee_gwei: L1 base fee for L2 chains (optional) - l1_data_cost_gwei: L1 data cost for L2 chains (optional) - estimated_cost_usd: Estimated cost in USD for 21000 gas - timestamp: When the gas price was observed |
Raises:
| Type | Description |
|---|---|
GasUnavailableError
|
If gas price cannot be retrieved |
ValueError
|
If no gas oracle is configured |
Example
Get gas for strategy's primary chain¶
gas = snapshot.gas_price() print(f"Base fee: {gas.base_fee_gwei} gwei") print(f"Estimated cost: ${gas.estimated_cost_usd}")
Get gas for specific chain¶
arb_gas = snapshot.gas_price("arbitrum") if arb_gas.is_l2: print(f"L1 data cost: {arb_gas.l1_data_cost_gwei} gwei")
pool_price
¶
Get the live price from an on-chain DEX pool.
Reads slot0() from the pool contract and decodes sqrtPriceX96 into a human-readable price using token decimals.
Returns a DataEnvelope[PoolPrice] with EXECUTION_GRADE classification (fail-closed: raises on any error, no off-chain fallback).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str | None
|
Chain name (e.g. "arbitrum", "base"). Defaults to the snapshot's primary chain. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[PoolPrice]
|
DataEnvelope[PoolPrice] with provenance metadata. |
Raises:
| Type | Description |
|---|---|
PoolPriceUnavailableError
|
If pool price cannot be retrieved. |
ValueError
|
If no pool reader registry is configured. |
pool_price_by_pair
¶
pool_price_by_pair(
token_a: str,
token_b: str,
chain: str | None = None,
protocol: str | None = None,
fee_tier: int = 3000,
) -> DataEnvelope[PoolPrice]
Get the live pool price for a token pair.
Resolves the pool address for the given pair and reads the price. This is a convenience method that wraps pool address resolution and price reading.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_a
|
str
|
Token A symbol or address. |
required |
token_b
|
str
|
Token B symbol or address. |
required |
chain
|
str | None
|
Chain name. Defaults to the snapshot's primary chain. |
None
|
protocol
|
str | None
|
Protocol name (e.g. "uniswap_v3"). If None, tries all registered protocols. |
None
|
fee_tier
|
int
|
Fee tier in basis points (default 3000 = 0.3%). |
3000
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[PoolPrice]
|
DataEnvelope[PoolPrice] with provenance metadata. |
Raises:
| Type | Description |
|---|---|
PoolPriceUnavailableError
|
If pool cannot be found or price cannot be read. |
ValueError
|
If no pool reader registry is configured. |
twap
¶
twap(
token_pair: str | Instrument,
chain: str | None = None,
window_seconds: int = 300,
pool_address: str | None = None,
protocol: str = "uniswap_v3",
) -> DataEnvelope[AggregatedPrice]
Get the time-weighted average price (TWAP) for a token pair.
Uses the Uniswap V3 oracle's observe() function to compute the TWAP over the specified time window.
Classification: EXECUTION_GRADE (fail-closed, no off-chain fallback).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_pair
|
str | Instrument
|
Token pair as "BASE/QUOTE" string (e.g. "WETH/USDC") or an Instrument instance. |
required |
chain
|
str | None
|
Chain name. Defaults to the snapshot's primary chain. |
None
|
window_seconds
|
int
|
Time window in seconds (default 300 = 5 min). |
300
|
pool_address
|
str | None
|
Explicit pool address. If None, resolves from pair. |
None
|
protocol
|
str
|
Protocol to use (default "uniswap_v3"). |
'uniswap_v3'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[AggregatedPrice]
|
DataEnvelope[AggregatedPrice] with TWAP price and provenance. |
Raises:
| Type | Description |
|---|---|
PoolPriceUnavailableError
|
If TWAP cannot be calculated. |
ValueError
|
If no price aggregator is configured. |
lwap
¶
lwap(
token_pair: str | Instrument,
chain: str | None = None,
fee_tiers: list[int] | None = None,
protocols: list[str] | None = None,
) -> DataEnvelope[AggregatedPrice]
Get the liquidity-weighted average price (LWAP) for a token pair.
Reads live prices from all known pools for the pair, filters by minimum liquidity, and computes a liquidity-weighted average.
Classification: EXECUTION_GRADE (fail-closed, no off-chain fallback).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_pair
|
str | Instrument
|
Token pair as "BASE/QUOTE" string (e.g. "WETH/USDC") or an Instrument instance. |
required |
chain
|
str | None
|
Chain name. Defaults to the snapshot's primary chain. |
None
|
fee_tiers
|
list[int] | None
|
Fee tiers to search (default: [100, 500, 3000, 10000]). |
None
|
protocols
|
list[str] | None
|
Protocols to search (default: all registered for chain). |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[AggregatedPrice]
|
DataEnvelope[AggregatedPrice] with LWAP price and provenance. |
Raises:
| Type | Description |
|---|---|
PoolPriceUnavailableError
|
If LWAP cannot be calculated. |
ValueError
|
If no price aggregator is configured. |
pool_history
¶
pool_history(
pool_address: str,
chain: str | None = None,
start_date: datetime | None = None,
end_date: datetime | None = None,
resolution: str = "1h",
) -> DataEnvelope[list[PoolSnapshot]]
Get historical pool state snapshots for backtesting and analytics.
Fetches TVL, volume, fee revenue, and reserve data from The Graph, DeFi Llama, or GeckoTerminal with graceful fallback between providers. Results are cached in VersionedDataCache for deterministic replay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str | None
|
Chain name (e.g. "arbitrum", "ethereum"). Defaults to strategy chain. |
None
|
start_date
|
datetime | None
|
Start of the history window (UTC). Defaults to 90 days ago. |
None
|
end_date
|
datetime | None
|
End of the history window (UTC). Defaults to now. |
None
|
resolution
|
str
|
Data resolution: "1h", "4h", or "1d". Default "1h". |
'1h'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[PoolSnapshot]]
|
DataEnvelope[list[PoolSnapshot]] with INFORMATIONAL classification. |
Raises:
| Type | Description |
|---|---|
PoolHistoryUnavailableError
|
If historical data cannot be retrieved. |
ValueError
|
If no pool history reader is configured. |
liquidity_depth
¶
Get tick-level liquidity depth for a concentrated-liquidity pool.
Reads the tick bitmap and individual tick liquidity values from the pool contract to build a picture of liquidity distribution around the current price. Essential for slippage estimation and position sizing.
Classification: EXECUTION_GRADE (fails closed, no off-chain fallback).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str | None
|
Chain name (e.g. "arbitrum", "ethereum"). Defaults to strategy chain. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[LiquidityDepth]
|
DataEnvelope[LiquidityDepth] with tick-level liquidity data. |
Raises:
| Type | Description |
|---|---|
LiquidityDepthUnavailableError
|
If liquidity data cannot be read. |
ValueError
|
If no liquidity depth reader is configured. |
estimate_slippage
¶
estimate_slippage(
token_in: str,
token_out: str,
amount: Decimal,
chain: str | None = None,
protocol: str | None = None,
) -> DataEnvelope[SlippageEstimate]
Estimate price impact and slippage for a potential swap.
Simulates the swap through tick ranges using actual on-chain liquidity data to compute the expected execution price and slippage. Logs a warning if estimated slippage exceeds the configured threshold (default 1%).
Classification: EXECUTION_GRADE (fails closed, no off-chain fallback).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_in
|
str
|
Input token symbol or address. |
required |
token_out
|
str
|
Output token symbol or address. |
required |
amount
|
Decimal
|
Amount of token_in to swap (human-readable units). |
required |
chain
|
str | None
|
Chain name. Defaults to strategy chain. |
None
|
protocol
|
str | None
|
Protocol name (e.g. "uniswap_v3"). Auto-detected if None. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[SlippageEstimate]
|
DataEnvelope[SlippageEstimate] with price impact data. |
Raises:
| Type | Description |
|---|---|
SlippageEstimateUnavailableError
|
If slippage cannot be estimated. |
ValueError
|
If no slippage estimator is configured. |
pool_reserves
¶
Get DEX pool reserves and state.
Fetches the current state of a DEX liquidity pool from the blockchain. Auto-detects the pool type (Uniswap V2 vs V3) by checking the contract interface.
Results are cached for 12 seconds (approximately 1 block) to avoid excessive RPC calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address |
required |
chain
|
str | None
|
Chain identifier (e.g., "ethereum", "arbitrum", "optimism"). If not specified, uses the strategy's primary chain from the MarketSnapshot. |
None
|
Returns:
| Type | Description |
|---|---|
PoolReserves
|
PoolReserves dataclass with: - pool_address: Pool contract address - dex: DEX type ('uniswap_v2', 'uniswap_v3', 'sushiswap') - token0: First token in the pair (ChainToken) - token1: Second token in the pair (ChainToken) - reserve0: Reserve of token0 (human-readable Decimal) - reserve1: Reserve of token1 (human-readable Decimal) - fee_tier: Pool fee in basis points - sqrt_price_x96: V3 sqrt price (None for V2) - tick: V3 current tick (None for V2) - liquidity: V3 in-range liquidity (None for V2) - tvl_usd: Total value locked in USD - last_updated: When the data was fetched |
Raises:
| Type | Description |
|---|---|
PoolReservesUnavailableError
|
If pool data cannot be retrieved |
ValueError
|
If no pool reader is configured |
Example
Get pool reserves for USDC/WETH pool¶
pool = snapshot.pool_reserves( "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640" ) print(f"Reserve0: {pool.reserve0} {pool.token0.symbol}") print(f"Reserve1: {pool.reserve1} {pool.token1.symbol}") print(f"TVL: ${pool.tvl_usd}")
Check if V3 pool¶
if pool.is_v3: print(f"Current tick: {pool.tick}")
prices
¶
Get prices for multiple tokens in a single batch call.
Fetches prices for all specified tokens in parallel using asyncio.gather. Returns partial results if some tokens fail (errors are logged).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens
|
list[str]
|
List of token symbols (e.g., ["WETH", "USDC", "ARB"]) |
required |
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
Returns:
| Type | Description |
|---|---|
dict[str, Decimal]
|
Dictionary mapping token symbols to their prices as Decimal. |
dict[str, Decimal]
|
Only includes tokens that were successfully fetched. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no price oracle is configured |
balances
¶
Get balances for multiple tokens in a single batch call.
Fetches balances for all specified tokens in parallel using asyncio.gather. Returns partial results if some tokens fail (errors are logged).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens
|
list[str]
|
List of token symbols (e.g., ["WETH", "USDC", "ARB"]) |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Decimal]
|
Dictionary mapping token symbols to their balances as Decimal. |
dict[str, Decimal]
|
Only includes tokens that were successfully fetched. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no balance provider is configured |
health
¶
Get a health report for all registered data providers.
Aggregates health metrics from all configured providers including price oracle, balance provider, OHLCV module, gas oracle, and pool reader.
The health report includes: - Individual source health (success rate, latency, errors) - Cache statistics (hits, misses, hit rate) - Overall system status (healthy, degraded, unhealthy)
Returns:
| Type | Description |
|---|---|
HealthReport
|
HealthReport dataclass with: - timestamp: When the report was generated - sources: Dictionary mapping source names to SourceHealth - cache_stats: CacheStats with cache performance metrics - overall_status: "healthy", "degraded", or "unhealthy" |
Example
report = snapshot.health() print(f"Overall status: {report.overall_status}") print(f"Sources: {list(report.sources.keys())}")
for name, health in report.sources.items(): print(f" {name}: {health.success_rate:.1%} success rate")
if report.failing_sources: print(f"Warning: failing sources: {report.failing_sources}")
balance_usd
¶
Get the wallet balance value in USD terms.
Calculates the USD value by multiplying the token balance by its current price.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "USDC") |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Balance value in USD as a Decimal |
Raises:
| Type | Description |
|---|---|
PriceUnavailableError
|
If price cannot be determined |
BalanceUnavailableError
|
If balance cannot be determined |
Example
eth_value = snapshot.balance_usd("WETH")
If balance is 2 WETH at $2500, returns: Decimal("5000.00")¶
total_portfolio_usd
¶
Get the total portfolio value in USD across multiple tokens.
Sums the USD value of all specified token balances.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens
|
list[str]
|
List of token symbols to include |
required |
Returns:
| Type | Description |
|---|---|
Decimal
|
Total portfolio value in USD as a Decimal |
Raises:
| Type | Description |
|---|---|
PriceUnavailableError
|
If any price cannot be determined |
BalanceUnavailableError
|
If any balance cannot be determined |
lending_rate
¶
Get the lending rate for a specific protocol and token.
Fetches the current supply or borrow APY from the specified lending protocol. Rates are cached for efficiency (typically 12s = ~1 block).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol
|
str
|
Protocol identifier (aave_v3, morpho_blue, compound_v3) |
required |
token
|
str
|
Token symbol (e.g., "USDC", "WETH") |
required |
side
|
str
|
Rate side - "supply" or "borrow" (default "supply") |
'supply'
|
Returns:
| Type | Description |
|---|---|
LendingRate
|
LendingRate dataclass with: - protocol: Protocol identifier - token: Token symbol - side: Rate side - apy_ray: APY in ray units (1e27 scale) - apy_percent: APY as percentage (e.g., 5.25 for 5.25%) - utilization_percent: Pool utilization percentage - timestamp: When rate was fetched - chain: Blockchain network - market_id: Market identifier (optional) |
Raises:
| Type | Description |
|---|---|
LendingRateUnavailableError
|
If rate cannot be retrieved |
ValueError
|
If no rate monitor is configured |
best_lending_rate
¶
best_lending_rate(
token: str,
side: str = "supply",
protocols: list[str] | None = None,
) -> BestRateResult
Get the best lending rate for a token across protocols.
Compares rates from all available lending protocols and returns the optimal one. For supply rates, returns highest APY. For borrow rates, returns lowest APY.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "USDC", "WETH") |
required |
side
|
str
|
Rate side - "supply" or "borrow" (default "supply") |
'supply'
|
protocols
|
list[str] | None
|
Protocols to compare (default: all available on chain) |
None
|
Returns:
| Type | Description |
|---|---|
BestRateResult
|
BestRateResult dataclass with: - token: Token symbol - side: Rate side - best_rate: The best LendingRate found (or None if all failed) - all_rates: List of all rates from different protocols - timestamp: When comparison was made |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no rate monitor is configured |
Example
Find best USDC supply rate¶
result = snapshot.best_lending_rate("USDC", "supply") if result.best_rate: print(f"Best rate: {result.best_rate.protocol} at {result.best_rate.apy_percent:.2f}%")
# Compare all protocols
for rate in result.all_rates:
print(f" {rate.protocol}: {rate.apy_percent:.2f}%")
funding_rate
¶
Get the funding rate for a perpetual venue and market.
Fetches the current funding rate from the specified venue. Funding rates indicate the cost of holding perpetual positions: - Positive rate: Longs pay shorts (bullish market) - Negative rate: Shorts pay longs (bearish market)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venue
|
str
|
Venue identifier (gmx_v2, hyperliquid) |
required |
market
|
str
|
Market symbol (e.g., "ETH-USD", "BTC-USD") |
required |
Returns:
| Type | Description |
|---|---|
FundingRate
|
FundingRate dataclass with: - venue: Venue identifier - market: Market symbol - rate_hourly: Hourly funding rate - rate_8h: 8-hour funding rate (typical display) - rate_annualized: Annualized rate for comparison - next_funding_time: Next settlement time - open_interest_long: Total long OI in USD - open_interest_short: Total short OI in USD - mark_price: Current mark price - index_price: Current index price |
Raises:
| Type | Description |
|---|---|
FundingRateUnavailableError
|
If rate cannot be retrieved |
ValueError
|
If no funding rate provider is configured |
funding_rate_spread
¶
Get the funding rate spread between two venues.
Compares funding rates from two venues to identify arbitrage opportunities. A positive spread means venue_a has higher funding than venue_b.
The spread can be used for funding rate arbitrage: - If spread > 0: Short venue_a, long venue_b - If spread < 0: Short venue_b, long venue_a
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
str
|
Market symbol (e.g., "ETH-USD") |
required |
venue_a
|
str
|
First venue identifier |
required |
venue_b
|
str
|
Second venue identifier |
required |
Returns:
| Type | Description |
|---|---|
FundingRateSpread
|
FundingRateSpread dataclass with: - market: Market symbol - venue_a: First venue - venue_b: Second venue - rate_a: Funding rate at venue_a - rate_b: Funding rate at venue_b - spread_8h: 8-hour spread (rate_a - rate_b) - spread_annualized: Annualized spread - is_profitable: True if spread exceeds threshold - recommended_direction: Trade direction for arb |
Raises:
| Type | Description |
|---|---|
FundingRateUnavailableError
|
If either rate cannot be retrieved |
ValueError
|
If no funding rate provider is configured |
Example
Compare GMX V2 vs Hyperliquid for ETH¶
spread = snapshot.funding_rate_spread( "ETH-USD", "gmx_v2", "hyperliquid" ) print(f"Spread: {spread.spread_percent_8h:.4f}% (8h)")
if spread.is_profitable: print(f"Arbitrage opportunity: {spread.recommended_direction}")
lending_rate_history
¶
lending_rate_history(
protocol: str,
token: str,
chain: str | None = None,
days: int = 90,
) -> DataEnvelope[list[LendingRateSnapshot]]
Get historical lending rate snapshots for backtesting.
Fetches supply/borrow APY history from The Graph or DeFi Llama with graceful fallback between providers. Results are cached in VersionedDataCache for deterministic replay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol
|
str
|
Lending protocol (e.g. "aave_v3", "morpho_blue", "compound_v3"). |
required |
token
|
str
|
Token symbol (e.g. "USDC", "WETH"). |
required |
chain
|
str | None
|
Chain name. Defaults to strategy chain. |
None
|
days
|
int
|
Number of days of history. Default 90. |
90
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[LendingRateSnapshot]]
|
DataEnvelope[list[LendingRateSnapshot]] with INFORMATIONAL classification. |
DataEnvelope[list[LendingRateSnapshot]]
|
Snapshots are sorted ascending by timestamp. |
Raises:
| Type | Description |
|---|---|
LendingRateHistoryUnavailableError
|
If historical data cannot be retrieved. |
ValueError
|
If no rate history reader is configured. |
Example
envelope = snapshot.lending_rate_history("aave_v3", "USDC", days=90) for snap in envelope.value: print(f"Supply: {snap.supply_apy}%, Borrow: {snap.borrow_apy}%")
funding_rate_history
¶
funding_rate_history(
venue: str, market_symbol: str, hours: int = 168
) -> DataEnvelope[list[FundingRateSnapshot]]
Get historical funding rate snapshots for backtesting.
Fetches funding rate history from Hyperliquid API or DeFi Llama with graceful fallback. Results are cached in VersionedDataCache for deterministic replay.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venue
|
str
|
Perps venue (e.g. "hyperliquid", "gmx_v2"). |
required |
market_symbol
|
str
|
Market symbol (e.g. "ETH-USD", "BTC-USD"). |
required |
hours
|
int
|
Number of hours of history. Default 168 (7 days). |
168
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[FundingRateSnapshot]]
|
DataEnvelope[list[FundingRateSnapshot]] with INFORMATIONAL classification. |
DataEnvelope[list[FundingRateSnapshot]]
|
Snapshots are sorted ascending by timestamp. |
Raises:
| Type | Description |
|---|---|
FundingRateHistoryUnavailableError
|
If historical data cannot be retrieved. |
ValueError
|
If no rate history reader is configured. |
Example
envelope = snapshot.funding_rate_history("hyperliquid", "ETH-USD", hours=168) for snap in envelope.value: print(f"Rate: {snap.rate}, Annualized: {snap.annualized_rate}")
price_across_dexs
¶
price_across_dexs(
token_in: str,
token_out: str,
amount: Decimal,
dexs: list[str] | None = None,
) -> MultiDexPriceResult
Get prices from multiple DEXs for comparison.
Fetches quotes from all configured DEXs (Uniswap V3, Curve, Enso) and returns a comparison of prices and execution details. Use this to identify the best execution venue for a swap.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_in
|
str
|
Input token symbol (e.g., "USDC", "WETH") |
required |
token_out
|
str
|
Output token symbol (e.g., "WETH", "USDC") |
required |
amount
|
Decimal
|
Input amount (human-readable, e.g., Decimal("10000") for 10k) |
required |
dexs
|
list[str] | None
|
DEXs to query (default: all available on chain) |
None
|
Returns:
| Type | Description |
|---|---|
MultiDexPriceResult
|
MultiDexPriceResult dataclass with: - token_in: Input token symbol - token_out: Output token symbol - amount_in: Input amount - quotes: Dictionary mapping DEX name to DexQuote - best_quote: Quote with highest output amount - price_spread_bps: Spread between best and worst in bps |
Raises:
| Type | Description |
|---|---|
DexQuoteUnavailableError
|
If no quotes can be fetched |
ValueError
|
If no multi-DEX service is configured |
Example
Compare prices for 10k USDC -> WETH¶
result = snapshot.price_across_dexs( "USDC", "WETH", Decimal("10000") )
for dex, quote in result.quotes.items(): print(f"{dex}: {quote.amount_out} WETH") print(f" Price impact: {quote.price_impact_bps} bps") print(f" Slippage estimate: {quote.slippage_estimate_bps} bps")
print(f"Best venue: {result.best_quote.dex}") print(f"Price spread: {result.price_spread_bps} bps")
best_dex_price
¶
best_dex_price(
token_in: str,
token_out: str,
amount: Decimal,
dexs: list[str] | None = None,
) -> BestDexResult
Get the best DEX for a trade.
Compares prices from all configured DEXs and returns the one with the highest output amount (best execution). This is useful for routing trades to the optimal venue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_in
|
str
|
Input token symbol (e.g., "USDC", "WETH") |
required |
token_out
|
str
|
Output token symbol (e.g., "WETH", "USDC") |
required |
amount
|
Decimal
|
Input amount (human-readable, e.g., Decimal("10000") for 10k) |
required |
dexs
|
list[str] | None
|
DEXs to compare (default: all available on chain) |
None
|
Returns:
| Type | Description |
|---|---|
BestDexResult
|
BestDexResult dataclass with: - token_in: Input token symbol - token_out: Output token symbol - amount_in: Input amount - best_dex: Best DEX for the trade (e.g., "uniswap_v3") - best_quote: DexQuote from the best DEX - all_quotes: List of quotes from all DEXs - savings_vs_worst_bps: Savings vs worst venue in bps |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no multi-DEX service is configured |
Example
Find best venue for USDC -> WETH swap¶
result = snapshot.best_dex_price( "USDC", "WETH", Decimal("10000") )
if result.best_quote: print(f"Best DEX: {result.best_dex}") print(f"Output: {result.best_quote.amount_out} WETH") print(f"Savings vs worst: {result.savings_vs_worst_bps} bps") else: print("No quotes available")
il_exposure
¶
Get the impermanent loss exposure for a tracked LP position.
Calculates the current IL for a tracked LP position using the position's entry prices and current market prices. This method requires an ILCalculator with the position already registered.
The ILCalculator should be configured with the position via add_position() before calling this method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
Unique identifier for the LP position |
required |
fees_earned
|
Decimal
|
Optional fees earned by the position (for net PnL calc) |
Decimal('0')
|
Returns:
| Type | Description |
|---|---|
ILExposure
|
ILExposure dataclass with: - position_id: Position identifier - position: LPPosition details - current_il: ILResult with IL metrics - entry_value: Original position value - current_value: Current position value - fees_earned: Fees earned (if provided) - net_pnl: Net profit/loss including fees |
Raises:
| Type | Description |
|---|---|
ILExposureUnavailableError
|
If exposure cannot be calculated |
ValueError
|
If no IL calculator is configured |
Example
Get IL exposure for a position¶
exposure = snapshot.il_exposure("my-lp-position-123") print(f"Current IL: {exposure.current_il.il_percent:.2f}%") print(f"Entry value: ${exposure.entry_value}") print(f"Current value: ${exposure.current_value}")
if exposure.il_offset_by_fees: print("Fees offset the IL - net positive!")
projected_il
¶
projected_il(
token_a: str,
token_b: str,
price_change_pct: Decimal,
weight_a: Decimal = Decimal("0.5"),
weight_b: Decimal = Decimal("0.5"),
) -> ProjectedILResult
Project impermanent loss for a hypothetical price change.
This method simulates what IL would be if token A's price changed by the specified percentage relative to token B. This is useful for understanding IL risk before entering a position.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_a
|
str
|
Symbol of token A (the volatile token) |
required |
token_b
|
str
|
Symbol of token B (often a stablecoin) |
required |
price_change_pct
|
Decimal
|
Price change percentage (e.g., 50 for +50%, -30 for -30%) |
required |
weight_a
|
Decimal
|
Weight of token A in the pool (default 0.5) |
Decimal('0.5')
|
weight_b
|
Decimal
|
Weight of token B in the pool (default 0.5) |
Decimal('0.5')
|
Returns:
| Type | Description |
|---|---|
ProjectedILResult
|
ProjectedILResult dataclass with: - price_change_pct: The input price change - il_ratio: Projected IL as decimal (e.g., -0.0057 for 0.57% loss) - il_percent: Projected IL as percentage (e.g., -0.57) - il_bps: Projected IL in basis points (e.g., -57) - pool_type: Type of pool (default: constant_product) - weight_a: Weight of token A - weight_b: Weight of token B |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no IL calculator is configured or invalid parameters |
Example
What would IL be if ETH goes up 50%?¶
proj = snapshot.projected_il("WETH", "USDC", Decimal("50")) print(f"If ETH +50%: IL = {proj.il_percent:.2f}%")
What if ETH drops 30%?¶
proj = snapshot.projected_il("WETH", "USDC", Decimal("-30")) print(f"If ETH -30%: IL = {proj.il_percent:.2f}%")
Weighted pool (80/20 ETH/USDC)¶
proj = snapshot.projected_il( "WETH", "USDC", price_change_pct=Decimal("100"), weight_a=Decimal("0.8"), weight_b=Decimal("0.2"), ) print(f"80/20 pool, ETH +100%: IL = {proj.il_percent:.2f}%")
prediction
¶
Get prediction market data.
Fetches full market details for a prediction market by ID or slug. Uses lazy loading - only fetches prediction data when accessed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str
|
Prediction market ID or URL slug Examples: "12345", "will-bitcoin-exceed-100k-2025" |
required |
Returns:
| Type | Description |
|---|---|
PredictionMarket
|
PredictionMarket with: - market_id: Internal market ID - condition_id: CTF condition ID (0x...) - question: Market question text - slug: URL slug - yes_price: Current YES outcome price (0-1) - no_price: Current NO outcome price (0-1) - spread: Bid-ask spread - volume_24h: 24-hour trading volume in USDC - liquidity: Current liquidity - end_date: Resolution deadline - is_active: Whether market is accepting orders - is_resolved: Whether market has been resolved |
Raises:
| Type | Description |
|---|---|
PredictionUnavailableError
|
If market data cannot be retrieved |
ValueError
|
If no prediction provider is configured |
Example
Get market by ID¶
market = snapshot.prediction("12345") print(f"YES: {market.yes_price}, NO: {market.no_price}")
Get market by slug¶
market = snapshot.prediction("will-btc-hit-100k") print(f"Question: {market.question}") print(f"24h Volume: ${market.volume_24h:,.2f}")
Check implied probability¶
yes_prob = market.yes_price * 100 print(f"Implied probability: {yes_prob:.1f}%")
prediction_positions
¶
Get all open prediction market positions.
Fetches positions from the prediction market provider. Can optionally filter by market ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str | None
|
Optional market ID or slug to filter by |
None
|
Returns:
| Type | Description |
|---|---|
list[PredictionPosition]
|
List of PredictionPosition objects with: - market_id: Market ID - condition_id: CTF condition ID - token_id: CLOB token ID - outcome: Position outcome (YES or NO) - size: Number of shares held - avg_price: Average entry price - current_price: Current market price - unrealized_pnl: Unrealized profit/loss - realized_pnl: Realized profit/loss - value: Current position value (size * current_price) |
Raises:
| Type | Description |
|---|---|
PredictionUnavailableError
|
If positions cannot be retrieved |
ValueError
|
If no prediction provider is configured |
Example
Get all positions¶
positions = snapshot.prediction_positions() total_value = sum(p.value for p in positions) print(f"Total position value: ${total_value:,.2f}")
Get positions for a specific market¶
positions = snapshot.prediction_positions("btc-100k") for pos in positions: print(f"{pos.outcome}: {pos.size} shares @ {pos.avg_price}") print(f" Unrealized PnL: ${pos.unrealized_pnl:,.2f}")
prediction_orders
¶
Get all open prediction market orders.
Fetches open orders from the prediction market provider. Can optionally filter by market ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str | None
|
Optional market ID or slug to filter by |
None
|
Returns:
| Type | Description |
|---|---|
list[PredictionOrder]
|
List of PredictionOrder objects with: - order_id: Order ID - market_id: Market ID (token ID) - outcome: Order outcome (YES or NO) - side: Order side (BUY or SELL) - price: Order price - size: Order size in shares - filled_size: Filled amount - remaining_size: Remaining unfilled size - created_at: Order creation timestamp |
Raises:
| Type | Description |
|---|---|
PredictionUnavailableError
|
If orders cannot be retrieved |
ValueError
|
If no prediction provider is configured |
Example
Get all open orders¶
orders = snapshot.prediction_orders() for order in orders: print(f"{order.side} {order.remaining_size} @ {order.price}")
Get orders for a specific market¶
orders = snapshot.prediction_orders("btc-100k") buy_orders = [o for o in orders if o.side == "BUY"] print(f"Open buy orders: {len(buy_orders)}")
realized_vol
¶
realized_vol(
token: str,
window_days: int = 30,
timeframe: str = "1h",
estimator: str = "close_to_close",
*,
ohlcv_limit: int | None = None,
) -> DataEnvelope[VolatilityResult]
Calculate realized volatility for a token.
Fetches OHLCV candles via the configured providers and computes realized volatility using the specified estimator. Requires either an OHLCVRouter or an OHLCVModule for data, and a RealizedVolatilityCalculator for the computation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g. "WETH", "ETH"). |
required |
window_days
|
int
|
Lookback window in calendar days. Default 30. |
30
|
timeframe
|
str
|
Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d). Default "1h". |
'1h'
|
estimator
|
str
|
"close_to_close" (default) or "parkinson". |
'close_to_close'
|
ohlcv_limit
|
int | None
|
Override for number of candles to fetch. If None, auto-calculated from window_days and timeframe. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[VolatilityResult]
|
DataEnvelope[VolatilityResult] with INFORMATIONAL classification. |
Raises:
| Type | Description |
|---|---|
VolatilityUnavailableError
|
If volatility cannot be calculated. |
ValueError
|
If no volatility calculator is configured. |
Example
result = snapshot.realized_vol("WETH", window_days=30, timeframe="1h") print(f"Annualized vol: {result.value.annualized_vol:.2%}") print(f"Daily vol: {result.value.daily_vol:.2%}")
vol_cone
¶
vol_cone(
token: str,
windows: list[int] | None = None,
timeframe: str = "1h",
estimator: str = "close_to_close",
*,
ohlcv_limit: int | None = None,
) -> DataEnvelope[VolConeResult]
Compute volatility cone: current vol vs historical percentile.
For each window, calculates the current realized vol and compares it to the historical distribution of rolling vols over the full candle history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g. "WETH"). |
required |
windows
|
list[int] | None
|
Lookback windows in days. Default [7, 14, 30, 90]. |
None
|
timeframe
|
str
|
Candle timeframe. Default "1h". |
'1h'
|
estimator
|
str
|
"close_to_close" or "parkinson". |
'close_to_close'
|
ohlcv_limit
|
int | None
|
Override for number of candles to fetch. If None, auto-calculated for the largest window. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[VolConeResult]
|
DataEnvelope[VolConeResult] with INFORMATIONAL classification. |
Raises:
| Type | Description |
|---|---|
VolConeUnavailableError
|
If vol cone cannot be calculated. |
ValueError
|
If no volatility calculator is configured. |
Example
cone = snapshot.vol_cone("WETH", windows=[7, 14, 30, 90]) for entry in cone.value.entries: print(f"{entry.window_days}d: {entry.current_vol:.2%} (p{entry.percentile:.0f})")
portfolio_risk
¶
portfolio_risk(
pnl_series: list[float],
total_value_usd: Decimal | None = None,
return_interval: str = "1d",
risk_free_rate: Decimal = Decimal("0"),
var_method: str = "parametric",
timestamps: list[datetime] | None = None,
benchmark_eth_returns: list[float] | None = None,
benchmark_btc_returns: list[float] | None = None,
) -> DataEnvelope[PortfolioRisk]
Calculate portfolio risk metrics from a PnL return series.
Computes Sharpe ratio, Sortino ratio, VaR, CVaR, and drawdown with explicit conventions for unambiguous results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pnl_series
|
list[float]
|
List of periodic returns as fractions (0.01 = 1% gain). |
required |
total_value_usd
|
Decimal | None
|
Current portfolio value in USD. Defaults to Decimal("0"). |
None
|
return_interval
|
str
|
Periodicity of returns (1d, 1h, etc.). |
'1d'
|
risk_free_rate
|
Decimal
|
Risk-free rate per period as a decimal. |
Decimal('0')
|
var_method
|
str
|
VaR method: "parametric", "historical", or "cornish_fisher". |
'parametric'
|
timestamps
|
list[datetime] | None
|
Optional timestamps for each return. |
None
|
benchmark_eth_returns
|
list[float] | None
|
Optional ETH returns for beta calculation. |
None
|
benchmark_btc_returns
|
list[float] | None
|
Optional BTC returns for beta calculation. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[PortfolioRisk]
|
DataEnvelope[PortfolioRisk] with INFORMATIONAL classification. |
Raises:
| Type | Description |
|---|---|
PortfolioRiskUnavailableError
|
If risk metrics cannot be calculated. |
ValueError
|
If no risk calculator is configured. |
Example
risk = snapshot.portfolio_risk(pnl_series, total_value_usd=Decimal("100000")) print(f"Sharpe: {risk.value.sharpe_ratio:.2f}") print(f"VaR 95%: ${risk.value.var_95}")
rolling_sharpe
¶
rolling_sharpe(
pnl_series: list[float],
window_days: int = 30,
return_interval: str = "1d",
risk_free_rate: Decimal = Decimal("0"),
timestamps: list[datetime] | None = None,
) -> DataEnvelope[RollingSharpeResult]
Compute rolling Sharpe ratio over a PnL series.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pnl_series
|
list[float]
|
List of periodic returns as fractions. |
required |
window_days
|
int
|
Rolling window in days. Default 30. |
30
|
return_interval
|
str
|
Periodicity of returns (1d, 1h, etc.). |
'1d'
|
risk_free_rate
|
Decimal
|
Risk-free rate per period. |
Decimal('0')
|
timestamps
|
list[datetime] | None
|
Optional timestamps aligned with pnl_series. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[RollingSharpeResult]
|
DataEnvelope[RollingSharpeResult] with INFORMATIONAL classification. |
Raises:
| Type | Description |
|---|---|
RollingSharpeUnavailableError
|
If rolling Sharpe cannot be computed. |
ValueError
|
If no risk calculator is configured. |
Example
result = snapshot.rolling_sharpe(pnl_series, window_days=30) for entry in result.value.entries: print(f"{entry.timestamp}: Sharpe={entry.sharpe:.2f}")
pool_analytics
¶
pool_analytics(
pool_address: str,
chain: str | None = None,
protocol: str | None = None,
) -> DataEnvelope[PoolAnalytics]
Get real-time analytics for a pool (TVL, volume, fee APR/APY).
Fetches from DeFi Llama (primary) or GeckoTerminal (fallback). Cache TTL: 5 minutes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str | None
|
Chain name. Defaults to strategy chain. |
None
|
protocol
|
str | None
|
Optional protocol hint (e.g. "uniswap_v3"). |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[PoolAnalytics]
|
DataEnvelope[PoolAnalytics] with INFORMATIONAL classification. |
Raises:
| Type | Description |
|---|---|
PoolAnalyticsUnavailableError
|
If analytics cannot be retrieved. |
ValueError
|
If no pool analytics reader is configured. |
best_pool
¶
best_pool(
token_a: str,
token_b: str,
chain: str | None = None,
metric: str = "fee_apr",
protocols: list[str] | None = None,
) -> DataEnvelope[PoolAnalyticsResult]
Find the best pool for a token pair based on a metric.
Searches DeFi Llama for matching pools and ranks by metric.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_a
|
str
|
First token symbol (e.g. "WETH"). |
required |
token_b
|
str
|
Second token symbol (e.g. "USDC"). |
required |
chain
|
str | None
|
Chain name. Defaults to strategy chain. |
None
|
metric
|
str
|
Sorting metric: "fee_apr", "fee_apy", "tvl_usd", "volume_24h_usd". |
'fee_apr'
|
protocols
|
list[str] | None
|
Optional list of protocols to filter by. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[PoolAnalyticsResult]
|
DataEnvelope[PoolAnalyticsResult] with the best pool. |
Raises:
| Type | Description |
|---|---|
PoolAnalyticsUnavailableError
|
If no pools found or all providers fail. |
ValueError
|
If no pool analytics reader is configured. |
yield_opportunities
¶
yield_opportunities(
token: str,
chains: list[str] | None = None,
min_tvl: float = 100000,
sort_by: str = "apy",
) -> DataEnvelope[list[YieldOpportunity]]
Find yield opportunities for a token across protocols and chains.
Searches DeFi Llama yields API for matching pools, sorted by the chosen metric. Cache TTL: 15 minutes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g. "USDC", "WETH"). |
required |
chains
|
list[str] | None
|
Optional list of chains to filter. None means all. |
None
|
min_tvl
|
float
|
Minimum TVL in USD. Default $100k. |
100000
|
sort_by
|
str
|
Sort field: "apy", "tvl", "risk_score". Default "apy". |
'apy'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[YieldOpportunity]]
|
DataEnvelope[list[YieldOpportunity]] sorted by chosen metric. |
Raises:
| Type | Description |
|---|---|
YieldOpportunitiesUnavailableError
|
If data cannot be retrieved. |
ValueError
|
If no yield aggregator is configured. |
to_dict
¶
Convert snapshot state to dictionary for serialization.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with snapshot metadata and cached values |
Price Data¶
PriceOracle¶
almanak.framework.data.PriceOracle
¶
Bases: Protocol
Protocol for price aggregation and oracle logic.
A PriceOracle wraps one or more BasePriceSource implementations and provides aggregation logic (median, weighted average, etc.) along with outlier detection and graceful degradation.
The oracle is the primary interface for strategies to get price data, abstracting away the complexity of managing multiple sources.
Key responsibilities: - Aggregate prices from multiple sources - Detect and filter outliers (>2% deviation from median) - Handle partial failures (some sources down) - Track source health metrics for routing decisions
Example implementation
class PriceAggregator: def init(self, sources: list[BasePriceSource]): self._sources = sources self._health_metrics: dict[str, SourceHealthMetrics] = {}
async def get_aggregated_price(
self, token: str, quote: str = "USD"
) -> PriceResult:
results = []
errors = {}
for source in self._sources:
try:
result = await source.get_price(token, quote)
results.append(result)
except Exception as e:
errors[source.source_name] = str(e)
if not results:
raise AllDataSourcesFailed(errors)
# Return median price with aggregated confidence
median_price = self._calculate_median(results)
confidence = self._calculate_confidence(results)
return PriceResult(
price=median_price,
source="aggregated",
timestamp=datetime.now(timezone.utc),
confidence=confidence,
)
get_aggregated_price
async
¶
Get aggregated price from multiple sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol to get price for |
required |
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
Returns:
| Type | Description |
|---|---|
PriceResult
|
PriceResult with aggregated price and confidence |
Raises:
| Type | Description |
|---|---|
AllDataSourcesFailed
|
If all sources fail to provide data |
get_source_health
¶
Get health metrics for a specific source.
Returns metrics like success rate, average latency, last error time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_name
|
str
|
Name of the source to query |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary with health metrics, or None if source unknown |
AggregatedPrice¶
almanak.framework.data.AggregatedPrice
dataclass
¶
AggregatedPrice(
price: Decimal,
sources: list[PoolContribution] = list(),
block_range: tuple[int, int] = (0, 0),
method: str = "lwap",
window_seconds: int = 0,
pool_count: int = 0,
)
Aggregated price from multiple pools or time periods.
Attributes:
| Name | Type | Description |
|---|---|---|
price |
Decimal
|
The aggregated price value. |
sources |
list[PoolContribution]
|
List of pool contributions with individual prices and weights. |
block_range |
tuple[int, int]
|
Tuple of (min_block, max_block) covered by the aggregation. |
method |
str
|
Aggregation method used ("twap" or "lwap"). |
window_seconds |
int
|
Time window in seconds (for TWAP). |
pool_count |
int
|
Number of pools used in aggregation. |
PriceAggregator¶
almanak.framework.data.PriceAggregator
¶
PriceAggregator(
pool_registry: PoolReaderRegistry,
rpc_call: RpcCallFn,
min_liquidity_usd: Decimal = DEFAULT_MIN_LIQUIDITY_USD,
reference_price_usd: Decimal | None = None,
)
Aggregates prices across pools using TWAP and LWAP methods.
TWAP uses Uniswap V3's built-in oracle (observe()) for time-weighted average prices. LWAP reads live prices from multiple pools and weights them by in-range liquidity.
All results are EXECUTION_GRADE: fail-closed with no off-chain fallback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_registry
|
PoolReaderRegistry
|
PoolReaderRegistry for reading pool prices. |
required |
rpc_call
|
RpcCallFn
|
RPC call function for direct contract reads (observe()). |
required |
min_liquidity_usd
|
Decimal
|
Minimum liquidity in USD to include a pool (default $10k). |
DEFAULT_MIN_LIQUIDITY_USD
|
reference_price_usd
|
Decimal | None
|
Reference price for converting liquidity to USD. If None, liquidity filtering uses raw liquidity values and the threshold is treated as raw liquidity units. |
None
|
twap
¶
twap(
pool_address: str,
chain: str,
window_seconds: int = 300,
token0_decimals: int = 18,
token1_decimals: int = 6,
protocol: str = "uniswap_v3",
) -> DataEnvelope[AggregatedPrice]
Calculate TWAP using Uniswap V3 oracle observe().
Calls observe([window_seconds, 0]) on the pool contract to get tick cumulatives, then derives the arithmetic mean tick and converts to a price.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str
|
Chain name. |
required |
window_seconds
|
int
|
Time window in seconds (default 300 = 5 min). |
300
|
token0_decimals
|
int
|
Decimals of token0 (default 18). |
18
|
token1_decimals
|
int
|
Decimals of token1 (default 6). |
6
|
protocol
|
str
|
Protocol name for source attribution. |
'uniswap_v3'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[AggregatedPrice]
|
DataEnvelope[AggregatedPrice] with TWAP price. |
Raises:
| Type | Description |
|---|---|
DataUnavailableError
|
If observe() call fails or returns invalid data. |
lwap
¶
lwap(
token_a: str,
token_b: str,
chain: str,
fee_tiers: list[int] | None = None,
protocols: list[str] | None = None,
) -> DataEnvelope[AggregatedPrice]
Calculate liquidity-weighted average price across pools.
Reads live prices from all known pools for the given pair, filters out pools below the minimum liquidity threshold, then computes LWAP = sum(price_i * liquidity_i) / sum(liquidity_i).
Falls back to single-pool price if only one pool is available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_a
|
str
|
Token A symbol or address. |
required |
token_b
|
str
|
Token B symbol or address. |
required |
chain
|
str
|
Chain name. |
required |
fee_tiers
|
list[int] | None
|
Fee tiers to search (default: [100, 500, 3000, 10000]). |
None
|
protocols
|
list[str] | None
|
Protocols to search (default: all registered for chain). |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[AggregatedPrice]
|
DataEnvelope[AggregatedPrice] with LWAP price. |
Raises:
| Type | Description |
|---|---|
DataUnavailableError
|
If no pools found for the pair (fail-closed). |
Balance Data¶
BalanceProvider¶
almanak.framework.data.BalanceProvider
¶
Bases: Protocol
Protocol for on-chain balance queries.
A BalanceProvider abstracts the complexity of querying ERC-20 and native token balances from the blockchain, handling decimal conversion, caching, and RPC error recovery.
Key responsibilities: - Query ERC-20 balances via balanceOf - Query native ETH balance via eth_getBalance - Handle token decimal conversion correctly - Cache balances to reduce RPC load - Invalidate cache after transaction execution
Example implementation
class Web3BalanceProvider: def init(self, web3: Web3, wallet_address: str): self._web3 = web3 self._wallet = wallet_address self._cache: dict[str, BalanceResult] = {} self._cache_ttl = 5 # 5 second cache
async def get_balance(self, token: str) -> BalanceResult:
if token == "ETH":
return await self._get_native_balance()
token_info = self._get_token_info(token)
contract = self._web3.eth.contract(
address=token_info.address,
abi=ERC20_ABI,
)
raw_balance = contract.functions.balanceOf(self._wallet).call()
balance = Decimal(raw_balance) / Decimal(10 ** token_info.decimals)
return BalanceResult(
balance=balance,
token=token,
address=token_info.address,
decimals=token_info.decimals,
raw_balance=raw_balance,
)
def invalidate_cache(self, token: Optional[str] = None) -> None:
if token:
self._cache.pop(token, None)
else:
self._cache.clear()
get_balance
async
¶
Get the balance of a token for the configured wallet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "USDC") or "ETH" for native |
required |
Returns:
| Type | Description |
|---|---|
BalanceResult
|
BalanceResult with balance in human-readable units |
Raises:
| Type | Description |
|---|---|
DataSourceError
|
If balance cannot be fetched |
get_native_balance
async
¶
Get the native token balance (ETH, MATIC, etc.).
Convenience method for getting the chain's native token balance.
Returns:
| Type | Description |
|---|---|
BalanceResult
|
BalanceResult for native token |
invalidate_cache
¶
Invalidate cached balances.
Should be called after transaction execution to ensure fresh data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str | None
|
Specific token to invalidate, or None to clear all |
None
|
OHLCV Data¶
OHLCVProvider¶
almanak.framework.data.OHLCVProvider
¶
Bases: Protocol
Protocol for OHLCV (candlestick) data providers.
Used by indicators like RSI that need historical price data.
Implementations must: - Support multiple timeframes (1m, 5m, 15m, 1h, 4h, 1d) - Return properly typed OHLCVCandle objects - Handle caching to avoid repeated API calls - Gracefully degrade if full history unavailable
The supported_timeframes property should return the subset of VALID_TIMEFRAMES that this provider can supply data for.
supported_timeframes
property
¶
Return the list of timeframes this provider supports.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of supported timeframe strings (e.g., ["1h", "4h", "1d"]) |
get_ohlcv
async
¶
get_ohlcv(
token: str,
quote: str = "USD",
timeframe: str = "1h",
limit: int = 100,
) -> list[OHLCVCandle]
Get OHLCV data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
quote
|
str
|
Quote currency |
'USD'
|
timeframe
|
str
|
Candle timeframe (must be in supported_timeframes) |
'1h'
|
limit
|
int
|
Number of candles to fetch |
100
|
Returns:
| Type | Description |
|---|---|
list[OHLCVCandle]
|
List of OHLCVCandle objects sorted by timestamp ascending |
Raises:
| Type | Description |
|---|---|
DataSourceError
|
If data cannot be fetched |
InsufficientDataError
|
If requested limit exceeds available data |
ValueError
|
If timeframe is not supported |
OHLCVData¶
almanak.framework.data.OHLCVData
dataclass
¶
OHLCVData(
timestamp: datetime,
open: Decimal,
high: Decimal,
low: Decimal,
close: Decimal,
volume: Decimal | None = None,
)
OHLCV candlestick data point.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp |
datetime
|
Candle open time |
open |
Decimal
|
Opening price |
high |
Decimal
|
Highest price |
low |
Decimal
|
Lowest price |
close |
Decimal
|
Closing price |
volume |
Decimal | None
|
Trading volume (optional, CoinGecko OHLC doesn't include volume) |
Pool Analytics¶
PoolAnalytics¶
almanak.framework.data.PoolAnalytics
dataclass
¶
PoolAnalytics(
pool_address: str,
chain: str,
protocol: str,
tvl_usd: Decimal,
volume_24h_usd: Decimal,
volume_7d_usd: Decimal,
fee_apr: float,
fee_apy: float,
utilization_rate: float | None = None,
token0_weight: float = 0.5,
token1_weight: float = 0.5,
)
Analytics for a single pool.
Attributes:
| Name | Type | Description |
|---|---|---|
pool_address |
str
|
Pool contract address. |
chain |
str
|
Chain name. |
protocol |
str
|
Protocol name (e.g. "uniswap_v3"). |
tvl_usd |
Decimal
|
Total value locked in USD. |
volume_24h_usd |
Decimal
|
24-hour trading volume in USD. |
volume_7d_usd |
Decimal
|
7-day trading volume in USD. |
fee_apr |
float
|
Annualized fee return as a percentage (e.g. 12.5 = 12.5%). |
fee_apy |
float
|
Compounded annual fee return as a percentage. |
utilization_rate |
float | None
|
Utilization rate for lending pools (0.0-1.0), None for DEX. |
token0_weight |
float
|
Fraction of TVL in token0 (0.0-1.0). |
token1_weight |
float
|
Fraction of TVL in token1 (0.0-1.0). |
LiquidityDepth¶
almanak.framework.data.LiquidityDepth
dataclass
¶
LiquidityDepth(
ticks: list[TickData],
total_liquidity: int,
current_tick: int,
current_price: Decimal,
pool_address: str = "",
token0_decimals: int = 18,
token1_decimals: int = 6,
tick_spacing: int = 60,
)
Tick-level liquidity distribution for a pool.
Attributes:
| Name | Type | Description |
|---|---|---|
ticks |
list[TickData]
|
Initialized ticks sorted by tick_index (ascending). |
total_liquidity |
int
|
Current in-range liquidity (L) from the pool. |
current_tick |
int
|
Current active tick from slot0. |
current_price |
Decimal
|
Current price from slot0. |
pool_address |
str
|
Pool contract address. |
token0_decimals |
int
|
Decimals of token0. |
token1_decimals |
int
|
Decimals of token1. |
tick_spacing |
int
|
Tick spacing for this pool. |
Volatility and Risk¶
RealizedVolatilityCalculator¶
almanak.framework.data.RealizedVolatilityCalculator
¶
Computes realized volatility from OHLCV candle data.
Supports two estimators: - close_to_close: Standard deviation of log returns. Simple and widely used, but only uses closing prices. - parkinson: High-low range estimator. More efficient (lower variance) for the same sample size because it uses intra-period range information.
All volatilities are annualized using sqrt(periods_per_year).
realized_vol
¶
realized_vol(
candles: list[OHLCVCandle],
window_days: int = 30,
timeframe: str = "1h",
estimator: str = "close_to_close",
) -> VolatilityResult
Calculate realized volatility over a lookback window.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
candles
|
list[OHLCVCandle]
|
OHLCV candles sorted ascending by timestamp. Should cover
at least |
required |
window_days
|
int
|
Lookback window in calendar days. |
30
|
timeframe
|
str
|
Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d). |
'1h'
|
estimator
|
str
|
"close_to_close" (default) or "parkinson". |
'close_to_close'
|
Returns:
| Type | Description |
|---|---|
VolatilityResult
|
VolatilityResult with annualized, daily, and hourly vol. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If fewer than 30 observations in the window. |
ValueError
|
If timeframe is unsupported or estimator unknown. |
vol_cone
¶
vol_cone(
candles: list[OHLCVCandle],
windows: list[int] | None = None,
timeframe: str = "1h",
estimator: str = "close_to_close",
token: str = "",
) -> VolConeResult
Compute volatility cone: current vol vs historical percentile.
For each window length, calculates the current realized vol and compares it to the distribution of rolling volatilities computed over the full candle history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
candles
|
list[OHLCVCandle]
|
Full OHLCV history sorted ascending. Should be significantly longer than the largest window for meaningful percentile estimation. |
required |
windows
|
list[int] | None
|
Lookback windows in days. Default [7, 14, 30, 90]. |
None
|
timeframe
|
str
|
Candle timeframe. |
'1h'
|
estimator
|
str
|
"close_to_close" or "parkinson". |
'close_to_close'
|
token
|
str
|
Token symbol for labeling. |
''
|
Returns:
| Type | Description |
|---|---|
VolConeResult
|
VolConeResult with one VolConeEntry per window. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If not enough data for the smallest window. |
ValueError
|
If timeframe is unsupported. |
PortfolioRiskCalculator¶
almanak.framework.data.PortfolioRiskCalculator
¶
Computes portfolio risk metrics from PnL or return series.
All calculations use explicit conventions (return interval, risk-free rate, annualization) to ensure results are unambiguous and comparable.
Supports three VaR methods: - parametric: Assumes normal distribution. VaR = -z * sigma * value. - historical: Percentile-based from empirical return distribution. - cornish_fisher: Adjusts z-score for skewness and kurtosis.
portfolio_risk
¶
portfolio_risk(
pnl_series: list[float],
total_value_usd: Decimal,
return_interval: str = "1d",
risk_free_rate: Decimal = Decimal("0"),
var_method: VaRMethod = VaRMethod.PARAMETRIC,
timestamps: list[datetime] | None = None,
benchmark_eth_returns: list[float] | None = None,
benchmark_btc_returns: list[float] | None = None,
) -> PortfolioRisk
Calculate portfolio risk metrics from a PnL series.
The pnl_series should contain periodic returns as fractions (e.g. 0.01 = 1% gain, -0.02 = 2% loss).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pnl_series
|
list[float]
|
List of periodic returns (fractions, not percentages). |
required |
total_value_usd
|
Decimal
|
Current portfolio value in USD. |
required |
return_interval
|
str
|
Periodicity of the returns (1d, 1h, etc.). |
'1d'
|
risk_free_rate
|
Decimal
|
Risk-free rate per period as a decimal. |
Decimal('0')
|
var_method
|
VaRMethod
|
VaR calculation method. |
PARAMETRIC
|
timestamps
|
list[datetime] | None
|
Optional timestamps for each return (for conventions). |
None
|
benchmark_eth_returns
|
list[float] | None
|
Optional ETH returns for beta calculation. |
None
|
benchmark_btc_returns
|
list[float] | None
|
Optional BTC returns for beta calculation. |
None
|
Returns:
| Type | Description |
|---|---|
PortfolioRisk
|
PortfolioRisk with all metrics and explicit conventions. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If fewer than 30 observations. |
ValueError
|
If return_interval is unsupported. |
rolling_sharpe
¶
rolling_sharpe(
pnl_series: list[float],
window_days: int = 30,
return_interval: str = "1d",
risk_free_rate: Decimal = Decimal("0"),
timestamps: list[datetime] | None = None,
) -> RollingSharpeResult
Compute rolling Sharpe ratio over the PnL series.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pnl_series
|
list[float]
|
List of periodic returns. |
required |
window_days
|
int
|
Rolling window in days. |
30
|
return_interval
|
str
|
Periodicity of the returns. |
'1d'
|
risk_free_rate
|
Decimal
|
Risk-free rate per period. |
Decimal('0')
|
timestamps
|
list[datetime] | None
|
Optional timestamps aligned with pnl_series. |
None
|
Returns:
| Type | Description |
|---|---|
RollingSharpeResult
|
RollingSharpeResult with time series of Sharpe ratios. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If fewer than 30 observations total. |
ValueError
|
If return_interval is unsupported. |
Yield and Rates¶
YieldAggregator¶
almanak.framework.data.YieldAggregator
¶
Cross-protocol yield comparison using DeFi Llama.
Fetches yield data from DeFi Llama yields API, filtering by token, chain, TVL, and protocol. Results are cached for 15 minutes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_ttl
|
int
|
In-memory cache TTL in seconds. Default 900 (15 minutes). |
900
|
request_timeout
|
float
|
HTTP request timeout in seconds. Default 15. |
15.0
|
get_yield_opportunities
¶
get_yield_opportunities(
token: str,
chains: list[str] | None = None,
min_tvl: float = 100000,
sort_by: str = "apy",
) -> DataEnvelope[list[YieldOpportunity]]
Find yield opportunities for a token across protocols and chains.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g. "USDC", "WETH"). |
required |
chains
|
list[str] | None
|
Optional list of chains to filter (e.g. ["arbitrum", "base"]). None means all supported chains. |
None
|
min_tvl
|
float
|
Minimum TVL in USD. Default $100k. |
100000
|
sort_by
|
str
|
Sort field: "apy", "tvl", "risk_score". Default "apy". |
'apy'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[YieldOpportunity]]
|
DataEnvelope[list[YieldOpportunity]] sorted by the chosen metric. |
Raises:
| Type | Description |
|---|---|
DataSourceUnavailable
|
If DeFi Llama API is unavailable. |
RateMonitor¶
almanak.framework.data.RateMonitor
¶
RateMonitor(
chain: str = "ethereum",
cache_ttl_seconds: float = DEFAULT_CACHE_TTL_SECONDS,
protocols: list[str] | None = None,
rpc_url: str | None = None,
)
Unified lending rate monitor for multiple DeFi protocols.
This class provides a single interface for fetching lending rates from Aave V3, Morpho Blue, and Compound V3. It handles caching, error recovery, and cross-protocol rate comparison.
Attributes:
| Name | Type | Description |
|---|---|---|
chain |
str
|
Blockchain network |
cache_ttl_seconds |
str
|
How long to cache rates (default 12s) |
protocols |
list[str]
|
List of protocols to monitor |
Example
monitor = RateMonitor(chain="ethereum")
Get specific rate¶
rate = await monitor.get_lending_rate("aave_v3", "USDC", RateSide.SUPPLY)
Get best rate¶
best = await monitor.get_best_lending_rate("USDC", RateSide.SUPPLY)
Get all rates for a protocol¶
rates = await monitor.get_protocol_rates("aave_v3")
Initialize the RateMonitor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
chain
|
str
|
Blockchain network (ethereum, arbitrum, etc.) |
'ethereum'
|
cache_ttl_seconds
|
float
|
Cache TTL in seconds (default 12s = ~1 block) |
DEFAULT_CACHE_TTL_SECONDS
|
protocols
|
list[str] | None
|
Protocols to monitor (default: all available on chain) |
None
|
rpc_url
|
str | None
|
RPC URL for on-chain queries (optional) |
None
|
set_mock_rate
¶
Set a mock rate for testing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol
|
str
|
Protocol identifier |
required |
token
|
str
|
Token symbol |
required |
side
|
str
|
supply or borrow |
required |
apy_percent
|
Decimal
|
APY as percentage (e.g., 5.0 for 5%) |
required |
get_lending_rate
async
¶
Get lending rate for a specific protocol/token/side.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol
|
str
|
Protocol identifier (aave_v3, morpho_blue, compound_v3) |
required |
token
|
str
|
Token symbol (USDC, WETH, etc.) |
required |
side
|
RateSide
|
Rate side (SUPPLY or BORROW) |
required |
Returns:
| Type | Description |
|---|---|
LendingRate
|
LendingRate with APY data |
Raises:
| Type | Description |
|---|---|
ProtocolNotSupportedError
|
If protocol not available on chain |
TokenNotSupportedError
|
If token not supported |
RateUnavailableError
|
If rate cannot be fetched |
get_best_lending_rate
async
¶
get_best_lending_rate(
token: str,
side: RateSide,
protocols: list[str] | None = None,
) -> BestRateResult
Get the best lending rate across protocols for a token.
For supply rates, returns the highest rate. For borrow rates, returns the lowest rate.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
side
|
RateSide
|
Rate side (SUPPLY or BORROW) |
required |
protocols
|
list[str] | None
|
Protocols to compare (default: all available) |
None
|
Returns:
| Type | Description |
|---|---|
BestRateResult
|
BestRateResult with best rate and all rates |
get_protocol_rates
async
¶
Get all rates for a protocol.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
protocol
|
str
|
Protocol identifier |
required |
tokens
|
list[str] | None
|
Tokens to fetch (default: common tokens for chain) |
None
|
Returns:
| Type | Description |
|---|---|
ProtocolRates
|
ProtocolRates with all token rates |
get_cache_stats
¶
Get cache statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with cache stats |
FundingRateProvider¶
almanak.framework.data.FundingRateProvider
¶
FundingRateProvider(
venues: list[str] | None = None,
cache_ttl_seconds: float = DEFAULT_CACHE_TTL_SECONDS,
rpc_url: str | None = None,
chain: str = "arbitrum",
)
Unified funding rate provider for multiple perpetual venues.
This class provides a single interface for fetching funding rates from GMX V2 and Hyperliquid. It handles caching, error recovery, and cross-venue rate comparison.
Attributes:
| Name | Type | Description |
|---|---|---|
venues |
list[str]
|
List of venues to monitor |
cache_ttl_seconds |
list[str]
|
How long to cache rates (default 60s) |
Example
provider = FundingRateProvider()
Get specific rate¶
rate = await provider.get_funding_rate(Venue.GMX_V2, "ETH-USD")
Get spread between venues¶
spread = await provider.get_funding_rate_spread("ETH-USD", Venue.GMX_V2, Venue.HYPERLIQUID)
Get historical rates¶
history = await provider.get_historical_funding_rates(Venue.GMX_V2, "ETH-USD", hours=24)
Initialize the FundingRateProvider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venues
|
list[str] | None
|
Venues to monitor (default: all supported) |
None
|
cache_ttl_seconds
|
float
|
Cache TTL in seconds (default 60s) |
DEFAULT_CACHE_TTL_SECONDS
|
rpc_url
|
str | None
|
RPC URL for on-chain queries (GMX V2). If not provided, will use default rates as fallback. |
None
|
chain
|
str
|
Chain for GMX V2 queries (default: arbitrum) |
'arbitrum'
|
set_mock_rate
¶
Set a mock funding rate for testing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venue
|
str
|
Venue identifier |
required |
market
|
str
|
Market symbol |
required |
rate_hourly
|
Decimal
|
Hourly funding rate |
required |
get_funding_rate
async
¶
Get the current funding rate for a venue/market.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venue
|
Venue
|
Venue identifier (gmx_v2, hyperliquid) |
required |
market
|
str
|
Market symbol (e.g., ETH-USD, BTC-USD) |
required |
Returns:
| Type | Description |
|---|---|
FundingRate
|
FundingRate with current rate data |
Raises:
| Type | Description |
|---|---|
VenueNotSupportedError
|
If venue not supported |
MarketNotSupportedError
|
If market not supported |
FundingRateUnavailableError
|
If rate cannot be fetched |
get_funding_rate_spread
async
¶
Get the funding rate spread between two venues.
The spread represents the difference in funding rates, which creates arbitrage opportunities. A positive spread means venue_a has higher funding than venue_b.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market
|
str
|
Market symbol (e.g., ETH-USD) |
required |
venue_a
|
Venue
|
First venue |
required |
venue_b
|
Venue
|
Second venue |
required |
Returns:
| Type | Description |
|---|---|
FundingRateSpread
|
FundingRateSpread with comparison data |
Raises:
| Type | Description |
|---|---|
FundingRateUnavailableError
|
If either rate cannot be fetched |
get_historical_funding_rates
async
¶
Get historical funding rates for a venue/market.
Returns historical funding rate data for analysis and trend detection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venue
|
Venue
|
Venue identifier |
required |
market
|
str
|
Market symbol |
required |
hours
|
int
|
Number of hours of history to fetch (default 24, max 168) |
24
|
Returns:
| Type | Description |
|---|---|
HistoricalFundingData
|
HistoricalFundingData with rate history |
Raises:
| Type | Description |
|---|---|
VenueNotSupportedError
|
If venue not supported |
MarketNotSupportedError
|
If market not supported |
FundingRateUnavailableError
|
If data cannot be fetched |
get_all_funding_rates
async
¶
Get funding rates for all supported markets on a venue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
venue
|
Venue
|
Venue identifier |
required |
Returns:
| Type | Description |
|---|---|
dict[str, FundingRate]
|
Dictionary mapping market symbol to FundingRate |
get_cache_stats
¶
Get cache statistics.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dictionary with cache stats |
Impermanent Loss¶
ILCalculator¶
almanak.framework.data.ILCalculator
¶
ILCalculator(
positions: dict[str, LPPosition] | None = None,
mock_prices: dict[str, Decimal] | None = None,
)
Calculator for impermanent loss across various AMM pool types.
This class provides methods to calculate impermanent loss for liquidity positions, including: - Standard constant product AMM pools (Uniswap V2, SushiSwap) - Concentrated liquidity pools (Uniswap V3) - Weighted pools (Balancer) - Stable pools (Curve)
The calculator also supports projecting IL for simulated price changes and tracking IL exposure for active positions.
Example
calc = ILCalculator()
Calculate IL for a 50/50 pool¶
result = calc.calculate_il( entry_price_a=Decimal("2000"), entry_price_b=Decimal("1"), current_price_a=Decimal("2500"), current_price_b=Decimal("1"), ) print(f"IL: {result.il_percent:.4f}%")
Project IL for various price changes¶
for pct in [10, 25, 50, 100]: proj = calc.project_il(price_change_pct=Decimal(pct)) print(f"Price +{pct}%: IL = {proj.il_percent:.4f}%")
Initialize the IL calculator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
positions
|
dict[str, LPPosition] | None
|
Optional dictionary of tracked LP positions |
None
|
mock_prices
|
dict[str, Decimal] | None
|
Optional mock prices for testing (token -> price) |
None
|
calculate_il
¶
calculate_il(
entry_price_a: Decimal,
entry_price_b: Decimal,
current_price_a: Decimal,
current_price_b: Decimal,
weight_a: Decimal = Decimal("0.5"),
weight_b: Decimal = Decimal("0.5"),
pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
entry_value: Decimal | None = None,
) -> ILResult
Calculate impermanent loss given entry and current prices.
For constant product AMMs (Uniswap V2 style), IL is calculated as: IL = 2 * sqrt(price_ratio) / (1 + price_ratio) - 1
For weighted pools (Balancer style), IL is calculated using: IL = (price_ratio ^ weight_a) / ((weight_a * price_ratio) + weight_b) - 1
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_price_a
|
Decimal
|
Entry price of token A (in quote currency, e.g., USD) |
required |
entry_price_b
|
Decimal
|
Entry price of token B (in quote currency) |
required |
current_price_a
|
Decimal
|
Current price of token A |
required |
current_price_b
|
Decimal
|
Current price of token B |
required |
weight_a
|
Decimal
|
Weight of token A (default 0.5 for equal weight) |
Decimal('0.5')
|
weight_b
|
Decimal
|
Weight of token B (default 0.5 for equal weight) |
Decimal('0.5')
|
pool_type
|
PoolType
|
Type of AMM pool |
CONSTANT_PRODUCT
|
entry_value
|
Decimal | None
|
Optional initial position value (for absolute loss calc) |
None
|
Returns:
| Type | Description |
|---|---|
ILResult
|
ILResult with IL metrics |
Raises:
| Type | Description |
|---|---|
InvalidPriceError
|
If any price is zero or negative |
InvalidWeightError
|
If weights don't sum to 1.0 |
calculate_il_concentrated
¶
calculate_il_concentrated(
entry_price_a: Decimal,
entry_price_b: Decimal,
current_price_a: Decimal,
current_price_b: Decimal,
tick_lower: int,
tick_upper: int,
entry_value: Decimal | None = None,
) -> ILResult
Calculate IL for concentrated liquidity positions (Uniswap V3).
Concentrated liquidity amplifies both gains and IL compared to full-range positions. The IL is higher when price moves outside the range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_price_a
|
Decimal
|
Entry price of token A |
required |
entry_price_b
|
Decimal
|
Entry price of token B |
required |
current_price_a
|
Decimal
|
Current price of token A |
required |
current_price_b
|
Decimal
|
Current price of token B |
required |
tick_lower
|
int
|
Lower tick bound of the position |
required |
tick_upper
|
int
|
Upper tick bound of the position |
required |
entry_value
|
Decimal | None
|
Optional initial position value |
None
|
Returns:
| Type | Description |
|---|---|
ILResult
|
ILResult with IL metrics for concentrated position |
Raises:
| Type | Description |
|---|---|
InvalidPriceError
|
If any price is zero or negative |
project_il
¶
project_il(
price_change_pct: Decimal,
weight_a: Decimal = Decimal("0.5"),
weight_b: Decimal = Decimal("0.5"),
pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
) -> ProjectedILResult
Project impermanent loss for a given price change.
This method simulates what IL would be if token A's price changed by the specified percentage while token B remains constant (typical for ETH/stablecoin pairs).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
price_change_pct
|
Decimal
|
Price change percentage (e.g., 50 for +50%, -30 for -30%) |
required |
weight_a
|
Decimal
|
Weight of token A (default 0.5) |
Decimal('0.5')
|
weight_b
|
Decimal
|
Weight of token B (default 0.5) |
Decimal('0.5')
|
pool_type
|
PoolType
|
Type of AMM pool |
CONSTANT_PRODUCT
|
Returns:
| Type | Description |
|---|---|
ProjectedILResult
|
ProjectedILResult with projected IL metrics |
Raises:
| Type | Description |
|---|---|
InvalidWeightError
|
If weights don't sum to 1.0 |
InvalidPriceError
|
If price change would result in negative price |
add_position
¶
Add an LP position for tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
LPPosition
|
The LP position to track |
required |
remove_position
¶
Remove an LP position from tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
ID of the position to remove |
required |
Raises:
| Type | Description |
|---|---|
PositionNotFoundError
|
If position doesn't exist |
get_position
¶
Get a tracked LP position.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
ID of the position |
required |
Returns:
| Type | Description |
|---|---|
LPPosition
|
The LP position |
Raises:
| Type | Description |
|---|---|
PositionNotFoundError
|
If position doesn't exist |
get_all_positions
¶
Get all tracked LP positions.
Returns:
| Type | Description |
|---|---|
list[LPPosition]
|
List of all tracked positions |
calculate_il_exposure
¶
calculate_il_exposure(
position_id: str,
current_price_a: Decimal | None = None,
current_price_b: Decimal | None = None,
fees_earned: Decimal = Decimal("0"),
) -> ILExposure
Calculate IL exposure for a tracked position.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
ID of the tracked position |
required |
current_price_a
|
Decimal | None
|
Current price of token A (uses mock if not provided) |
None
|
current_price_b
|
Decimal | None
|
Current price of token B (uses mock if not provided) |
None
|
fees_earned
|
Decimal
|
Fees earned by the position (optional) |
Decimal('0')
|
Returns:
| Type | Description |
|---|---|
ILExposure
|
ILExposure with full exposure details |
Raises:
| Type | Description |
|---|---|
PositionNotFoundError
|
If position doesn't exist |
ILExposureUnavailableError
|
If prices cannot be determined |
set_mock_prices
¶
Set mock prices for testing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prices
|
dict[str, Decimal]
|
Dictionary mapping token symbols to prices |
required |
Data Routing¶
DataRouter¶
almanak.framework.data.DataRouter
dataclass
¶
Routes data requests to providers with fallback and circuit-breaking.
For EXECUTION_GRADE data types, the router fails closed after the primary provider fails -- no fallback to degraded sources is attempted.
For INFORMATIONAL data types, the router tries the fallback chain in order, each with its own timeout, and returns the best available result with degraded confidence.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
DataRoutingConfig
|
Routing configuration specifying provider assignments. |
register_provider
¶
Register a data provider for routing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
DataProvider
|
A DataProvider implementation to register. |
required |
route
¶
route(
data_type: str,
*,
instrument: str = "",
strategy_config: dict | None = None,
**fetch_kwargs: object,
) -> DataEnvelope
Select a provider and fetch data with fallback logic.
Provider selection order
- Strategy-level override (if strategy_config provided)
- Global routing config
- Built-in defaults
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_type
|
str
|
Data type key (e.g. "ohlcv", "pool_price"). |
required |
instrument
|
str
|
Instrument identifier for logging/error context. |
''
|
strategy_config
|
dict | None
|
Optional strategy config dict with data_overrides. |
None
|
**fetch_kwargs
|
object
|
Additional kwargs passed through to provider.fetch(). |
{}
|
Returns:
| Type | Description |
|---|---|
DataEnvelope
|
DataEnvelope from the selected provider. |
Raises:
| Type | Description |
|---|---|
DataUnavailableError
|
When all providers fail. |
get_metrics
¶
Return metrics for a specific provider or all providers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider_name
|
str | None
|
If provided, return metrics for this provider only. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict of metric data. |
health
¶
Return health status for all registered providers.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict mapping provider name -> circuit breaker health dict. |
CircuitBreaker¶
almanak.framework.data.CircuitBreaker
¶
Thread-safe circuit breaker for a single data provider.
Opens after failure_threshold consecutive failures, then enters
a cooldown period. After cooldown, allows one test request (HALF_OPEN).
If the test succeeds the circuit closes; if it fails the circuit reopens.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Provider name this breaker guards. |
|
failure_threshold |
Consecutive failures before opening. |
|
cooldown_seconds |
Seconds to wait before half-open test. |
last_failure_time
property
¶
Monotonic timestamp of the last recorded failure, or None.
allow_request
¶
Check whether a request should be allowed through.
Returns:
| Type | Description |
|---|---|
bool
|
True if the circuit is CLOSED or transitioning to HALF_OPEN. |
bool
|
False if the circuit is OPEN (still within cooldown). |
record_success
¶
Record a successful request, resetting the breaker to CLOSED.
health
¶
Return health metrics for monitoring.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict with state, failure_count, last_failure_time, and config. |
Exceptions¶
almanak.framework.data.DataUnavailableError
¶
Bases: DataSourceError
Raised when required data cannot be obtained from any provider.
Used by the DataRouter when all providers (primary + fallbacks) have failed or been circuit-broken for a given request.
Attributes:
| Name | Type | Description |
|---|---|---|
data_type |
What kind of data was requested (e.g. 'pool_price', 'ohlcv'). |
|
instrument |
Instrument identifier (symbol or address). |
|
reason |
Human-readable explanation. |
almanak.framework.data.MarketSnapshotError
¶
Bases: Exception
Base exception for MarketSnapshot errors.