Data Layer¶
Price oracles, balance providers, OHLCV sources, and the indicator/analytics primitives consumed by MarketSnapshot.
Price Data¶
PriceOracle¶
almanak.framework.data.PriceOracle
¶
Bases: Protocol
Protocol for price aggregation and oracle logic.
A PriceOracle wraps one or more BasePriceSource implementations and provides aggregation logic (median, weighted average, etc.) along with outlier detection and graceful degradation.
The oracle is the primary interface for strategies to get price data, abstracting away the complexity of managing multiple sources.
Key responsibilities: - Aggregate prices from multiple sources - Detect and filter outliers (>2% deviation from median) - Handle partial failures (some sources down) - Track source health metrics for routing decisions
Example implementation
class PriceAggregator: def init(self, sources: list[BasePriceSource]): self._sources = sources self._health_metrics: dict[str, SourceHealthMetrics] = {}
async def get_aggregated_price(
self, token: str, quote: str = "USD", *, chain: str | None = None
) -> PriceResult:
results = []
errors = {}
for source in self._sources:
try:
result = await source.get_price(token, quote)
results.append(result)
except Exception as e:
errors[source.source_name] = str(e)
if not results:
raise AllDataSourcesFailed(errors)
# Return median price with aggregated confidence
median_price = self._calculate_median(results)
confidence = self._calculate_confidence(results)
return PriceResult(
price=median_price,
source="aggregated",
timestamp=datetime.now(timezone.utc),
confidence=confidence,
)
get_aggregated_price
async
¶
Get aggregated price from multiple sources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol to get price for |
required |
quote
|
str
|
Quote currency (default "USD") |
'USD'
|
chain
|
str | None
|
Optional chain context for chain-specific or address-based pricing. Single-chain callers may omit this and let the implementation use its default chain, but multi-chain callers should pass it explicitly. |
None
|
Returns:
| Type | Description |
|---|---|
PriceResult
|
PriceResult with aggregated price and confidence |
Raises:
| Type | Description |
|---|---|
AllDataSourcesFailed
|
If all sources fail to provide data |
get_source_health
¶
Get health metrics for a specific source.
Returns metrics like success rate, average latency, last error time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_name
|
str
|
Name of the source to query |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any] | None
|
Dictionary with health metrics, or None if source unknown |
AggregatedPrice¶
almanak.framework.data.AggregatedPrice
dataclass
¶
AggregatedPrice(
price: Decimal,
sources: list[PoolContribution] = list(),
block_range: tuple[int, int] = (0, 0),
method: str = "lwap",
window_seconds: int = 0,
pool_count: int = 0,
)
Aggregated price from multiple pools or time periods.
Attributes:
| Name | Type | Description |
|---|---|---|
price |
Decimal
|
The aggregated price value. |
sources |
list[PoolContribution]
|
List of pool contributions with individual prices and weights. |
block_range |
tuple[int, int]
|
Tuple of (min_block, max_block) covered by the aggregation. |
method |
str
|
Aggregation method used ("twap" or "lwap"). |
window_seconds |
int
|
Time window in seconds (for TWAP). |
pool_count |
int
|
Number of pools used in aggregation. |
PriceAggregator¶
almanak.framework.data.PriceAggregator
¶
PriceAggregator(
pool_registry: PoolReaderRegistry,
rpc_call: RpcCallFn,
min_liquidity_usd: Decimal = DEFAULT_MIN_LIQUIDITY_USD,
reference_price_usd: Decimal | None = None,
source_name: str = "alchemy_rpc",
)
Aggregates prices across pools using TWAP and LWAP methods.
TWAP uses Uniswap V3's built-in oracle (observe()) for time-weighted average prices. LWAP reads live prices from multiple pools and weights them by in-range liquidity.
All results are EXECUTION_GRADE: fail-closed with no off-chain fallback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_registry
|
PoolReaderRegistry
|
PoolReaderRegistry for reading pool prices. |
required |
rpc_call
|
RpcCallFn
|
RPC call function for direct contract reads (observe()). |
required |
min_liquidity_usd
|
Decimal
|
Minimum liquidity in USD to include a pool (default $10k). |
DEFAULT_MIN_LIQUIDITY_USD
|
reference_price_usd
|
Decimal | None
|
Reference price for converting liquidity to USD. If None, liquidity filtering uses raw liquidity values and the threshold is treated as raw liquidity units. |
None
|
source_name
|
str
|
Provenance label stamped on the aggregated |
'alchemy_rpc'
|
twap
¶
twap(
pool_address: str,
chain: str,
window_seconds: int = 300,
token0_decimals: int = 18,
token1_decimals: int = 6,
protocol: str = "uniswap_v3",
) -> DataEnvelope[AggregatedPrice]
Calculate TWAP using Uniswap V3 oracle observe().
Calls observe([window_seconds, 0]) on the pool contract to get tick cumulatives, then derives the arithmetic mean tick and converts to a price.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str
|
Chain name. |
required |
window_seconds
|
int
|
Time window in seconds (default 300 = 5 min). |
300
|
token0_decimals
|
int
|
Decimals of token0 (default 18). |
18
|
token1_decimals
|
int
|
Decimals of token1 (default 6). |
6
|
protocol
|
str
|
Protocol name for source attribution. |
'uniswap_v3'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[AggregatedPrice]
|
DataEnvelope[AggregatedPrice] with TWAP price. |
Raises:
| Type | Description |
|---|---|
DataUnavailableError
|
If observe() call fails or returns invalid data. |
lwap
¶
lwap(
token_a: str,
token_b: str,
chain: str,
fee_tiers: list[int] | None = None,
protocols: list[str] | None = None,
) -> DataEnvelope[AggregatedPrice]
Calculate liquidity-weighted average price across pools.
Reads live prices from all known pools for the given pair, filters out pools below the minimum liquidity threshold, then computes LWAP = sum(price_i * liquidity_i) / sum(liquidity_i).
Falls back to single-pool price if only one pool is available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token_a
|
str
|
Token A symbol or address. |
required |
token_b
|
str
|
Token B symbol or address. |
required |
chain
|
str
|
Chain name. |
required |
fee_tiers
|
list[int] | None
|
Fee tiers to search (default: [100, 500, 3000, 10000]). |
None
|
protocols
|
list[str] | None
|
Protocols to search (default: all registered for chain). |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[AggregatedPrice]
|
DataEnvelope[AggregatedPrice] with LWAP price. |
Raises:
| Type | Description |
|---|---|
DataUnavailableError
|
If no pools found for the pair (fail-closed). |
Balance Data¶
BalanceProvider¶
almanak.framework.data.BalanceProvider
¶
Bases: Protocol
Protocol for on-chain balance queries.
A BalanceProvider abstracts the complexity of querying ERC-20 and native token balances from the blockchain, handling decimal conversion, caching, and RPC error recovery.
Key responsibilities: - Query ERC-20 balances via balanceOf - Query native ETH balance via eth_getBalance - Handle token decimal conversion correctly - Cache balances to reduce RPC load - Invalidate cache after transaction execution
Example implementation
class Web3BalanceProvider: def init(self, web3: Web3, wallet_address: str): self._web3 = web3 self._wallet = wallet_address self._cache: dict[str, BalanceResult] = {} self._cache_ttl = 5 # 5 second cache
async def get_balance(self, token: str) -> BalanceResult:
if token == "ETH":
return await self._get_native_balance()
token_info = self._get_token_info(token)
contract = self._web3.eth.contract(
address=token_info.address,
abi=ERC20_ABI,
)
raw_balance = contract.functions.balanceOf(self._wallet).call()
balance = Decimal(raw_balance) / Decimal(10 ** token_info.decimals)
return BalanceResult(
balance=balance,
token=token,
address=token_info.address,
decimals=token_info.decimals,
raw_balance=raw_balance,
)
def invalidate_cache(self, token: Optional[str] = None) -> None:
if token:
self._cache.pop(token, None)
else:
self._cache.clear()
get_balance
async
¶
Get the balance of a token for the configured wallet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g., "WETH", "USDC") or "ETH" for native |
required |
Returns:
| Type | Description |
|---|---|
BalanceResult
|
BalanceResult with balance in human-readable units |
Raises:
| Type | Description |
|---|---|
DataSourceError
|
If balance cannot be fetched |
get_native_balance
async
¶
Get the native token balance (ETH, MATIC, etc.).
Convenience method for getting the chain's native token balance.
Returns:
| Type | Description |
|---|---|
BalanceResult
|
BalanceResult for native token |
invalidate_cache
¶
Invalidate cached balances.
Should be called after transaction execution to ensure fresh data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str | None
|
Specific token to invalidate, or None to clear all |
None
|
OHLCV Data¶
OHLCVProvider¶
almanak.framework.data.OHLCVProvider
¶
Bases: Protocol
Protocol for OHLCV (candlestick) data providers.
Used by indicators like RSI that need historical price data.
Implementations must: - Support multiple timeframes (1m, 5m, 15m, 1h, 4h, 1d) - Return properly typed OHLCVCandle objects - Handle caching to avoid repeated API calls - Gracefully degrade if full history unavailable
The supported_timeframes property should return the subset of VALID_TIMEFRAMES that this provider can supply data for.
supported_timeframes
property
¶
Return the list of timeframes this provider supports.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of supported timeframe strings (e.g., ["1h", "4h", "1d"]) |
get_ohlcv
async
¶
get_ohlcv(
token: str,
quote: str = "USD",
timeframe: str = "1h",
limit: int = 100,
) -> list[OHLCVCandle]
Get OHLCV data for a token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol |
required |
quote
|
str
|
Quote currency |
'USD'
|
timeframe
|
str
|
Candle timeframe (must be in supported_timeframes) |
'1h'
|
limit
|
int
|
Number of candles to fetch |
100
|
Returns:
| Type | Description |
|---|---|
list[OHLCVCandle]
|
List of OHLCVCandle objects sorted by timestamp ascending |
Raises:
| Type | Description |
|---|---|
DataSourceError
|
If data cannot be fetched |
InsufficientDataError
|
If requested limit exceeds available data |
ValueError
|
If timeframe is not supported |
OHLCVData¶
almanak.framework.data.OHLCVData
dataclass
¶
OHLCVData(
timestamp: datetime,
open: Decimal,
high: Decimal,
low: Decimal,
close: Decimal,
volume: Decimal | None = None,
)
OHLCV candlestick data point.
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp |
datetime
|
Candle open time |
open |
Decimal
|
Opening price |
high |
Decimal
|
Highest price |
low |
Decimal
|
Lowest price |
close |
Decimal
|
Closing price |
volume |
Decimal | None
|
Trading volume (optional, CoinGecko OHLC doesn't include volume) |
Pool Analytics¶
PoolAnalytics¶
almanak.framework.data.PoolAnalytics
dataclass
¶
PoolAnalytics(
pool_address: str,
chain: str,
protocol: str,
tvl_usd: Decimal,
volume_24h_usd: Decimal,
volume_7d_usd: Decimal,
fee_apr: float,
fee_apy: float,
utilization_rate: float | None = None,
token0_weight: float = 0.5,
token1_weight: float = 0.5,
unmeasured_fields: frozenset[str] = frozenset(),
)
Analytics for a single pool.
Attributes:
| Name | Type | Description |
|---|---|---|
pool_address |
str
|
Pool contract address. |
chain |
str
|
Chain name. |
protocol |
str
|
Protocol name (e.g. |
tvl_usd |
Decimal
|
Total value locked in USD. For backwards compatibility
with the pre-VIB-4727 callers, |
volume_24h_usd |
Decimal
|
24-hour trading volume in USD. |
volume_7d_usd |
Decimal
|
7-day trading volume in USD. |
fee_apr |
float
|
Annualized fee return as a percentage. |
fee_apy |
float
|
Compounded annual fee return as a percentage. |
utilization_rate |
float | None
|
Utilization for lending pools (0.0-1.0), None for DEX. |
token0_weight |
float
|
Fraction of TVL in token0 (0.0-1.0). |
token1_weight |
float
|
See |
unmeasured_fields |
frozenset[str]
|
Names of money-critical fields the upstream
provider did NOT measure (i.e. came back as empty-string on
the wire). A field appearing here means the corresponding
|
PoolAnalyticsReader¶
VIB-4727: this reader is a thin gRPC client over the gateway's
PoolAnalyticsService. It owns no HTTP egress; all upstream provider
calls (DefiLlama / CoinGecko Onchain) happen inside the gateway sidecar.
See the Market Snapshot HOLD contract
for the propagation rule strategy authors must follow.
almanak.framework.data.pools.analytics.PoolAnalyticsReader
¶
Thin gRPC client over the gateway's PoolAnalyticsService.
This class no longer owns any HTTP egress. All upstream provider
fetching (DefiLlama, GeckoTerminal) happens inside the gateway
sidecar. The constructor REQUIRES a connected GatewayClient —
constructing one without it deliberately raises TypeError so any
stale PoolAnalyticsReader() call from before VIB-4727 fails loudly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gateway_client
|
GatewayClient
|
The connected gateway client. |
required |
timeout_seconds
|
float
|
gRPC call timeout (default 15s, matching the gateway's upstream HTTP timeout). |
15.0
|
get_pool_analytics
¶
get_pool_analytics(
pool_address: str,
chain: str,
protocol: str | None = None,
) -> DataEnvelope[PoolAnalytics]
Get real-time analytics for a pool via the gateway.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str
|
Chain name (e.g. |
required |
protocol
|
str | None
|
Optional protocol hint (e.g. |
None
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[PoolAnalytics]
|
|
Raises:
| Type | Description |
|---|---|
DataSourceUnavailable
|
When the gateway returns a non-OK status
or when both upstream providers fail. Callers that catch
this exception MUST either re-raise or return |
best_pool
¶
best_pool(
token_a: str,
token_b: str,
chain: str,
metric: str = "fee_apr",
protocols: list[str] | None = None,
) -> DataEnvelope[PoolAnalyticsResult]
Deferred to a follow-up gateway RPC.
The pre-VIB-4727 implementation enumerated all DefiLlama pools and
filtered locally — that egress path can't ship as-is from the
strategy container. A second gateway RPC (SearchPools) is
required. Tracking ticket: VIB-4729.
Until the RPC lands this raises DataSourceUnavailable rather
than NotImplementedError so that the wrap-and-reraise in
MarketSnapshot.best_pool(...) produces a
PoolAnalyticsUnavailableError whose __cause__ chain
classifies as DATA_UNAVAILABLE — the runner treats that as
HOLD-worthy, the same contract a real provider outage uses. A
bare NotImplementedError would have crashed the iteration loop
instead of producing a HOLD.
health
¶
Provider health is now owned by the gateway servicer.
The legacy class exposed this for the old aiohttp-using providers. Strategy-container code that wants provider stats should call the gateway's metrics endpoint instead. Returning an empty dict here keeps the attribute non-throwing for any callers that still poll it during the cut-over.
NullPoolAnalyticsReader¶
Backtest factories (MarketSnapshotBuilder.for_pnl_backtest_state,
for_paper_fork) inject this stub. It always raises
DataSourceUnavailable("backtest"), forcing strategies inside a
backtest to take a deterministic code path (static fee, fixture data,
or HOLD).
almanak.framework.data.pools.analytics.NullPoolAnalyticsReader
¶
Always-raises stub used by backtest factories (VIB-4727).
Live gateway HTTP at backtest time = nondeterministic results across
runs — strategies that "work in backtest" then silently change behavior
in production. The agreed contract (per the VIB-4727 design discussion)
is: backtest factories inject this null reader; strategies that depend
on pool_analytics(...) must take a deterministic code path inside
backtests (a static fee assumption, a fixture-backed analytics, or HOLD).
Any call raises DataSourceUnavailable("backtest") so the runner's
HOLD inference path is exercised identically to a real gateway outage.
Pool History¶
PoolHistoryReader¶
VIB-4728: thin gRPC client over the gateway's PoolHistoryService. The
framework reader owns NO HTTP / GraphQL egress — all upstream provider
calls (The Graph subgraphs → DefiLlama → CoinGecko Onchain) happen inside
the gateway sidecar; the strategy container holds zero API keys.
Returns a DataEnvelope[list[PoolSnapshot]] covering the requested
window, with chain-aware canonical address normalization,
unmeasured_fields-tagged Empty != Zero semantics on every snapshot,
and explicit TruncationReason enum carrying soft-cap / page-cap /
provider-retention semantics. See the Market Snapshot HOLD
contract for the
DataSourceUnavailable propagation rule strategy authors must follow.
almanak.framework.data.pools.history.PoolHistoryReader
¶
Thin gRPC client over the gateway's PoolHistoryService.
This class no longer owns any HTTP / GraphQL egress. All upstream
provider fetching (The Graph, DefiLlama, GeckoTerminal) happens
inside the gateway sidecar. The constructor REQUIRES a connected
GatewayClient — constructing one without it deliberately raises
TypeError so any stale PoolHistoryReader() call from before
VIB-4728 fails loudly.
protocol on get_pool_history is keyword-only required
(no default) — closes the silent cross-protocol surface flagged by
Phase 0b Round-4 of docs/internal/uat-cards/VIB-4755.md §D-2.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gateway_client
|
GatewayClient
|
The connected gateway client. REQUIRED — a
|
required |
timeout_seconds
|
float
|
gRPC call timeout (default 30s — longer than
|
30.0
|
get_pool_history
¶
get_pool_history(
pool_address: str,
chain: str,
start_date: datetime,
end_date: datetime | None = None,
resolution: str = "1h",
*,
protocol: str,
) -> DataEnvelope[list[PoolSnapshot]]
Fetch historical pool snapshots via the gateway.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_address
|
str
|
Pool contract address. |
required |
chain
|
str
|
Chain name (e.g. |
required |
start_date
|
datetime
|
Start of the history window (UTC). |
required |
end_date
|
datetime | None
|
End of the history window. |
None
|
resolution
|
str
|
One of |
'1h'
|
protocol
|
str
|
REQUIRED keyword-only. Protocol slug — e.g.
|
required |
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[PoolSnapshot]]
|
|
DataEnvelope[list[PoolSnapshot]]
|
classification. Money fields are |
DataEnvelope[list[PoolSnapshot]]
|
( |
DataEnvelope[list[PoolSnapshot]]
|
in |
Raises:
| Type | Description |
|---|---|
DataSourceUnavailable
|
When the gateway returns a non-OK
status, the gateway client is not connected, OR the
gateway returns |
health
¶
Provider health is now owned by the gateway servicer.
The legacy class exposed this for the old direct-HTTP
providers. Strategy-container code that wants provider stats
should call the gateway's metrics endpoint instead. Returning
an empty dict keeps the attribute non-throwing for any callers
that still poll it during the cut-over. Mirrors the VIB-4727
PoolAnalyticsReader.health() compat shim.
PoolSnapshot¶
The DTO returned per row by PoolHistoryReader.get_pool_history(...).
Money fields (tvl, volume_24h, fee_revenue_24h, token0_reserve,
token1_reserve) are typed Decimal | None per the Empty != Zero
contract — a None field is named in unmeasured_fields.
almanak.framework.data.pools.history.PoolSnapshot
dataclass
¶
PoolSnapshot(
timestamp: datetime,
tvl: Decimal | None,
volume_24h: Decimal | None,
fee_revenue_24h: Decimal | None,
token0_reserve: Decimal | None,
token1_reserve: Decimal | None,
unmeasured_fields: frozenset[str] = frozenset(),
)
Historical pool state at a specific point in time.
All money / reserve fields are typed Decimal | None — None
means the upstream provider did NOT measure the field for this row
(Empty != Zero). The unmeasured field names are also listed in
unmeasured_fields for grep-friendly inspection
(if "tvl" in snap.unmeasured_fields:).
The pre-VIB-4728 dataclass shape (non-Optional Decimal with
Decimal("0") substituted for unmeasured) was an Empty == Zero
bug; VIB-4728 evolves the boundary to honour CLAUDE.md §Accounting
"Empty != Zero".
Attributes:
| Name | Type | Description |
|---|---|---|
timestamp |
datetime
|
UTC datetime of the snapshot (timezone-aware). |
tvl |
Decimal | None
|
Total value locked in USD. |
volume_24h |
Decimal | None
|
24-hour trading volume in USD. |
fee_revenue_24h |
Decimal | None
|
24-hour fee revenue in USD. |
token0_reserve |
Decimal | None
|
token0 reserve in human-readable units. |
token1_reserve |
Decimal | None
|
token1 reserve in human-readable units. |
unmeasured_fields |
frozenset[str]
|
Names of fields that are |
NullPoolHistoryReader¶
VIB-4728: the backtest-deterministic stub. MarketSnapshotBuilder.for_pnl_backtest_state
and for_paper_fork inject this reader, which always raises
DataSourceUnavailable("backtest") so a strategy run inside a backtest
cannot make a history-driven decision implicitly. Verified — via three
armed monkeypatches on socket.socket.connect, aiohttp.ClientSession,
and grpc.aio.{insecure,secure}_channel — to construct ZERO network
primitives across the four-class enumeration (in-process network,
high-level child-spawn, low-level spawn syscalls, FFI).
almanak.framework.data.null_readers.NullPoolHistoryReader
¶
Always-raises stub used by backtest factories (VIB-4755).
Live gateway HTTP at backtest time = nondeterministic results
across runs — strategies that "work in backtest" then silently
change behaviour in production. The agreed contract is: backtest
factories inject this null reader; strategies that depend on
pool_history(...) must take a deterministic code path inside
backtests.
Any call raises DataSourceUnavailable("backtest") so the
runner's HOLD inference path is exercised identically to a real
gateway outage. The class is intentionally a thin shell — NO
primitives are constructed in __init__ or anywhere else.
This is verified by the D2.M6 38-primitive monkeypatch
determinism test (tests/framework/market/test_backtest_pool_history_determinism.py
::test_null_reader_constructs_no_network_primitives).
health
¶
Compat shim — mirrors the live reader's health() shape.
The live reader's health() returns {} (per-provider
stats are now server-side). The null reader returns the same
empty dict so any caller that polls .health() during the
cut-over gets the same non-throwing response in both backtest
and live paths.
LiquidityDepth¶
almanak.framework.data.LiquidityDepth
dataclass
¶
LiquidityDepth(
ticks: list[TickData],
total_liquidity: int,
current_tick: int,
current_price: Decimal,
pool_address: str = "",
token0_decimals: int = 18,
token1_decimals: int = 6,
tick_spacing: int = 60,
)
Tick-level liquidity distribution for a pool.
Attributes:
| Name | Type | Description |
|---|---|---|
ticks |
list[TickData]
|
Initialized ticks sorted by tick_index (ascending). |
total_liquidity |
int
|
Current in-range liquidity (L) from the pool. |
current_tick |
int
|
Current active tick from slot0. |
current_price |
Decimal
|
Current price from slot0. |
pool_address |
str
|
Pool contract address. |
token0_decimals |
int
|
Decimals of token0. |
token1_decimals |
int
|
Decimals of token1. |
tick_spacing |
int
|
Tick spacing for this pool. |
Volatility and Risk¶
RealizedVolatilityCalculator¶
almanak.framework.data.RealizedVolatilityCalculator
¶
Computes realized volatility from OHLCV candle data.
Supports two estimators: - close_to_close: Standard deviation of log returns. Simple and widely used, but only uses closing prices. - parkinson: High-low range estimator. More efficient (lower variance) for the same sample size because it uses intra-period range information.
All volatilities are annualized using sqrt(periods_per_year).
realized_vol
¶
realized_vol(
candles: list[OHLCVCandle],
window_days: int = 30,
timeframe: str = "1h",
estimator: str = "close_to_close",
) -> VolatilityResult
Calculate realized volatility over a lookback window.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
candles
|
list[OHLCVCandle]
|
OHLCV candles sorted ascending by timestamp. Should cover
at least |
required |
window_days
|
int
|
Lookback window in calendar days. |
30
|
timeframe
|
str
|
Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d). |
'1h'
|
estimator
|
str
|
"close_to_close" (default) or "parkinson". |
'close_to_close'
|
Returns:
| Type | Description |
|---|---|
VolatilityResult
|
VolatilityResult with annualized, daily, and hourly vol. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If fewer than 30 observations in the window. |
ValueError
|
If timeframe is unsupported or estimator unknown. |
vol_cone
¶
vol_cone(
candles: list[OHLCVCandle],
windows: list[int] | None = None,
timeframe: str = "1h",
estimator: str = "close_to_close",
token: str = "",
) -> VolConeResult
Compute volatility cone: current vol vs historical percentile.
For each window length, calculates the current realized vol and compares it to the distribution of rolling volatilities computed over the full candle history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
candles
|
list[OHLCVCandle]
|
Full OHLCV history sorted ascending. Should be significantly longer than the largest window for meaningful percentile estimation. |
required |
windows
|
list[int] | None
|
Lookback windows in days. Default [7, 14, 30, 90]. |
None
|
timeframe
|
str
|
Candle timeframe. |
'1h'
|
estimator
|
str
|
"close_to_close" or "parkinson". |
'close_to_close'
|
token
|
str
|
Token symbol for labeling. |
''
|
Returns:
| Type | Description |
|---|---|
VolConeResult
|
VolConeResult with one VolConeEntry per window. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If not enough data for the smallest window. |
ValueError
|
If timeframe is unsupported. |
PortfolioRiskCalculator¶
almanak.framework.data.PortfolioRiskCalculator
¶
Computes portfolio risk metrics from PnL or return series.
All calculations use explicit conventions (return interval, risk-free rate, annualization) to ensure results are unambiguous and comparable.
Supports three VaR methods: - parametric: Assumes normal distribution. VaR = -z * sigma * value. - historical: Percentile-based from empirical return distribution. - cornish_fisher: Adjusts z-score for skewness and kurtosis.
portfolio_risk
¶
portfolio_risk(
pnl_series: list[float],
total_value_usd: Decimal,
return_interval: str = "1d",
risk_free_rate: Decimal = Decimal("0"),
var_method: VaRMethod = VaRMethod.PARAMETRIC,
timestamps: list[datetime] | None = None,
benchmark_eth_returns: list[float] | None = None,
benchmark_btc_returns: list[float] | None = None,
) -> PortfolioRisk
Calculate portfolio risk metrics from a PnL series.
The pnl_series should contain periodic returns as fractions (e.g. 0.01 = 1% gain, -0.02 = 2% loss).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pnl_series
|
list[float]
|
List of periodic returns (fractions, not percentages). |
required |
total_value_usd
|
Decimal
|
Current portfolio value in USD. |
required |
return_interval
|
str
|
Periodicity of the returns (1d, 1h, etc.). |
'1d'
|
risk_free_rate
|
Decimal
|
Risk-free rate per period as a decimal. |
Decimal('0')
|
var_method
|
VaRMethod
|
VaR calculation method. |
PARAMETRIC
|
timestamps
|
list[datetime] | None
|
Optional timestamps for each return (for conventions). |
None
|
benchmark_eth_returns
|
list[float] | None
|
Optional ETH returns for beta calculation. |
None
|
benchmark_btc_returns
|
list[float] | None
|
Optional BTC returns for beta calculation. |
None
|
Returns:
| Type | Description |
|---|---|
PortfolioRisk
|
PortfolioRisk with all metrics and explicit conventions. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If fewer than 30 observations. |
ValueError
|
If return_interval is unsupported. |
rolling_sharpe
¶
rolling_sharpe(
pnl_series: list[float],
window_days: int = 30,
return_interval: str = "1d",
risk_free_rate: Decimal = Decimal("0"),
timestamps: list[datetime] | None = None,
) -> RollingSharpeResult
Compute rolling Sharpe ratio over the PnL series.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pnl_series
|
list[float]
|
List of periodic returns. |
required |
window_days
|
int
|
Rolling window in days. |
30
|
return_interval
|
str
|
Periodicity of the returns. |
'1d'
|
risk_free_rate
|
Decimal
|
Risk-free rate per period. |
Decimal('0')
|
timestamps
|
list[datetime] | None
|
Optional timestamps aligned with pnl_series. |
None
|
Returns:
| Type | Description |
|---|---|
RollingSharpeResult
|
RollingSharpeResult with time series of Sharpe ratios. |
Raises:
| Type | Description |
|---|---|
InsufficientDataError
|
If fewer than 30 observations total. |
ValueError
|
If return_interval is unsupported. |
Yield and Rates¶
YieldAggregator¶
almanak.framework.data.YieldAggregator
¶
Cross-protocol yield comparison using DeFi Llama.
Fetches yield data from DeFi Llama yields API, filtering by token, chain, TVL, and protocol. Results are cached for 15 minutes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cache_ttl
|
int
|
In-memory cache TTL in seconds. Default 900 (15 minutes). |
900
|
request_timeout
|
float
|
HTTP request timeout in seconds. Default 15. |
15.0
|
get_yield_opportunities
¶
get_yield_opportunities(
token: str,
chains: list[str] | None = None,
min_tvl: float = 100000,
sort_by: str = "apy",
) -> DataEnvelope[list[YieldOpportunity]]
Find yield opportunities for a token across protocols and chains.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token symbol (e.g. "USDC", "WETH"). |
required |
chains
|
list[str] | None
|
Optional list of chains to filter (e.g. ["arbitrum", "base"]). None means all supported chains. |
None
|
min_tvl
|
float
|
Minimum TVL in USD. Default $100k. |
100000
|
sort_by
|
str
|
Sort field: "apy", "tvl", "risk_score". Default "apy". |
'apy'
|
Returns:
| Type | Description |
|---|---|
DataEnvelope[list[YieldOpportunity]]
|
DataEnvelope[list[YieldOpportunity]] sorted by the chosen metric. |
Raises:
| Type | Description |
|---|---|
DataSourceUnavailable
|
If DeFi Llama API is unavailable. |
Lending rates¶
Strategies read live lending rates through
MarketSnapshot.lending_rate(...) /
MarketSnapshot.best_lending_rate(...) — the canonical, gateway-backed
accessors. The underlying RateMonitor is a framework-internal gRPC client
of the gateway RateHistoryService and is no longer a public strategy API
(deprecated for direct use as of VIB-4859 / VIB-4869).
GatewayFundingRateProvider¶
almanak.framework.data.GatewayFundingRateProvider
¶
GatewayFundingRateProvider(
gateway_client: GatewayClient,
chain: str = DEFAULT_CHAIN,
cache_ttl_seconds: float = DEFAULT_CACHE_TTL_SECONDS,
)
Funding rate provider that delegates to the gateway sidecar.
The gateway owns all network egress (Hyperliquid HTTP, GMX V2 RPC), credential storage, SSL configuration, and rate limiting. Strategy code calls this provider over the in-cluster gRPC channel only.
get_funding_rate
async
¶
Get the current funding rate for venue/market.
Raises:
| Type | Description |
|---|---|
VenueNotSupportedError
|
|
MarketNotSupportedError
|
|
FundingRateUnavailableError
|
the gateway returned an error. |
get_funding_rate_spread
async
¶
get_funding_rate_spread(
market: str, venue_a: Venue | str, venue_b: Venue | str
) -> FundingRateSpread
Get the funding rate spread between venue_a and venue_b.
Issues a single GetFundingRateSpread RPC so the gateway can
fetch both rates concurrently. The signed spread_8h is computed
locally from venue_a_rate.rate_hourly - venue_b_rate.rate_hourly
because the wire spread_hourly field is absolute by historical
convention and we need sign for recommended_direction.
get_rates_for_market
async
¶
get_rates_for_market(
market: str, venues: list[Venue | str] | None = None
) -> dict[str, FundingRate]
Fetch market funding rates across multiple venues concurrently.
Impermanent Loss¶
ILCalculator¶
almanak.framework.data.ILCalculator
¶
ILCalculator(
positions: dict[str, LPPosition] | None = None,
mock_prices: dict[str, Decimal] | None = None,
)
Calculator for impermanent loss across various AMM pool types.
This class provides methods to calculate impermanent loss for liquidity positions, including: - Standard constant product AMM pools (Uniswap V2, SushiSwap) - Concentrated liquidity pools (Uniswap V3) - Weighted pools (Balancer) - Stable pools (Curve)
The calculator also supports projecting IL for simulated price changes and tracking IL exposure for active positions.
Example
calc = ILCalculator()
Calculate IL for a 50/50 pool¶
result = calc.calculate_il( entry_price_a=Decimal("2000"), entry_price_b=Decimal("1"), current_price_a=Decimal("2500"), current_price_b=Decimal("1"), ) print(f"IL: {result.il_percent:.4f}%")
Project IL for various price changes¶
for pct in [10, 25, 50, 100]: proj = calc.project_il(price_change_pct=Decimal(pct)) print(f"Price +{pct}%: IL = {proj.il_percent:.4f}%")
Initialize the IL calculator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
positions
|
dict[str, LPPosition] | None
|
Optional dictionary of tracked LP positions |
None
|
mock_prices
|
dict[str, Decimal] | None
|
Optional mock prices for testing (token -> price) |
None
|
calculate_il
¶
calculate_il(
entry_price_a: Decimal,
entry_price_b: Decimal,
current_price_a: Decimal,
current_price_b: Decimal,
weight_a: Decimal = Decimal("0.5"),
weight_b: Decimal = Decimal("0.5"),
pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
entry_value: Decimal | None = None,
) -> ILResult
Calculate impermanent loss given entry and current prices.
For constant product AMMs (Uniswap V2 style), IL is calculated as: IL = 2 * sqrt(price_ratio) / (1 + price_ratio) - 1
For weighted pools (Balancer style), IL is calculated using: IL = (price_ratio ^ weight_a) / ((weight_a * price_ratio) + weight_b) - 1
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_price_a
|
Decimal
|
Entry price of token A (in quote currency, e.g., USD) |
required |
entry_price_b
|
Decimal
|
Entry price of token B (in quote currency) |
required |
current_price_a
|
Decimal
|
Current price of token A |
required |
current_price_b
|
Decimal
|
Current price of token B |
required |
weight_a
|
Decimal
|
Weight of token A (default 0.5 for equal weight) |
Decimal('0.5')
|
weight_b
|
Decimal
|
Weight of token B (default 0.5 for equal weight) |
Decimal('0.5')
|
pool_type
|
PoolType
|
Type of AMM pool |
CONSTANT_PRODUCT
|
entry_value
|
Decimal | None
|
Optional initial position value (for absolute loss calc) |
None
|
Returns:
| Type | Description |
|---|---|
ILResult
|
ILResult with IL metrics |
Raises:
| Type | Description |
|---|---|
InvalidPriceError
|
If any price is zero or negative |
InvalidWeightError
|
If weights don't sum to 1.0 |
calculate_il_concentrated
¶
calculate_il_concentrated(
entry_price_a: Decimal,
entry_price_b: Decimal,
current_price_a: Decimal,
current_price_b: Decimal,
tick_lower: int,
tick_upper: int,
entry_value: Decimal | None = None,
) -> ILResult
Calculate IL for concentrated liquidity positions (Uniswap V3).
Concentrated liquidity amplifies both gains and IL compared to full-range positions. The IL is higher when price moves outside the range.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_price_a
|
Decimal
|
Entry price of token A |
required |
entry_price_b
|
Decimal
|
Entry price of token B |
required |
current_price_a
|
Decimal
|
Current price of token A |
required |
current_price_b
|
Decimal
|
Current price of token B |
required |
tick_lower
|
int
|
Lower tick bound of the position |
required |
tick_upper
|
int
|
Upper tick bound of the position |
required |
entry_value
|
Decimal | None
|
Optional initial position value |
None
|
Returns:
| Type | Description |
|---|---|
ILResult
|
ILResult with IL metrics for concentrated position |
Raises:
| Type | Description |
|---|---|
InvalidPriceError
|
If any price is zero or negative |
project_il
¶
project_il(
price_change_pct: Decimal,
weight_a: Decimal = Decimal("0.5"),
weight_b: Decimal = Decimal("0.5"),
pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
) -> ProjectedILResult
Project impermanent loss for a given price change.
This method simulates what IL would be if token A's price changed by the specified percentage while token B remains constant (typical for ETH/stablecoin pairs).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
price_change_pct
|
Decimal
|
Price change percentage (e.g., 50 for +50%, -30 for -30%) |
required |
weight_a
|
Decimal
|
Weight of token A (default 0.5) |
Decimal('0.5')
|
weight_b
|
Decimal
|
Weight of token B (default 0.5) |
Decimal('0.5')
|
pool_type
|
PoolType
|
Type of AMM pool |
CONSTANT_PRODUCT
|
Returns:
| Type | Description |
|---|---|
ProjectedILResult
|
ProjectedILResult with projected IL metrics |
Raises:
| Type | Description |
|---|---|
InvalidWeightError
|
If weights don't sum to 1.0 |
InvalidPriceError
|
If price change would result in negative price |
add_position
¶
Add an LP position for tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
LPPosition
|
The LP position to track |
required |
remove_position
¶
Remove an LP position from tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
ID of the position to remove |
required |
Raises:
| Type | Description |
|---|---|
PositionNotFoundError
|
If position doesn't exist |
get_position
¶
Get a tracked LP position.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
ID of the position |
required |
Returns:
| Type | Description |
|---|---|
LPPosition
|
The LP position |
Raises:
| Type | Description |
|---|---|
PositionNotFoundError
|
If position doesn't exist |
get_all_positions
¶
Get all tracked LP positions.
Returns:
| Type | Description |
|---|---|
list[LPPosition]
|
List of all tracked positions |
calculate_il_exposure
¶
calculate_il_exposure(
position_id: str,
current_price_a: Decimal | None = None,
current_price_b: Decimal | None = None,
fees_earned: Decimal = Decimal("0"),
) -> ILExposure
Calculate IL exposure for a tracked position.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position_id
|
str
|
ID of the tracked position |
required |
current_price_a
|
Decimal | None
|
Current price of token A (uses mock if not provided) |
None
|
current_price_b
|
Decimal | None
|
Current price of token B (uses mock if not provided) |
None
|
fees_earned
|
Decimal
|
Fees earned by the position (optional) |
Decimal('0')
|
Returns:
| Type | Description |
|---|---|
ILExposure
|
ILExposure with full exposure details |
Raises:
| Type | Description |
|---|---|
PositionNotFoundError
|
If position doesn't exist |
ILExposureUnavailableError
|
If prices cannot be determined |
set_mock_prices
¶
Set mock prices for testing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prices
|
dict[str, Decimal]
|
Dictionary mapping token symbols to prices |
required |
Data Routing¶
DataRouter¶
almanak.framework.data.DataRouter
dataclass
¶
Routes data requests to providers with fallback and circuit-breaking.
For EXECUTION_GRADE data types, the router fails closed after the primary provider fails -- no fallback to degraded sources is attempted.
For INFORMATIONAL data types, the router tries the fallback chain in order, each with its own timeout, and returns the best available result with degraded confidence.
Attributes:
| Name | Type | Description |
|---|---|---|
config |
DataRoutingConfig
|
Routing configuration specifying provider assignments. |
register_provider
¶
Register a data provider for routing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider
|
DataProvider
|
A DataProvider implementation to register. |
required |
route
¶
route(
data_type: str,
*,
instrument: str = "",
strategy_config: dict | None = None,
**fetch_kwargs: object,
) -> DataEnvelope
Select a provider and fetch data with fallback logic.
Provider selection order
- Strategy-level override (if strategy_config provided)
- Global routing config
- Built-in defaults
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_type
|
str
|
Data type key (e.g. "ohlcv", "pool_price"). |
required |
instrument
|
str
|
Instrument identifier for logging/error context. |
''
|
strategy_config
|
dict | None
|
Optional strategy config dict with data_overrides. |
None
|
**fetch_kwargs
|
object
|
Additional kwargs passed through to provider.fetch(). |
{}
|
Returns:
| Type | Description |
|---|---|
DataEnvelope
|
DataEnvelope from the selected provider. |
Raises:
| Type | Description |
|---|---|
DataUnavailableError
|
When all providers fail. |
get_metrics
¶
Return metrics for a specific provider or all providers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider_name
|
str | None
|
If provided, return metrics for this provider only. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict of metric data. |
health
¶
Return health status for all registered providers.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict mapping provider name -> circuit breaker health dict. |
CircuitBreaker¶
almanak.framework.data.CircuitBreaker
¶
Thread-safe circuit breaker for a single data provider.
Opens after failure_threshold consecutive failures, then enters
a cooldown period. After cooldown, allows one test request (HALF_OPEN).
If the test succeeds the circuit closes; if it fails the circuit reopens.
Attributes:
| Name | Type | Description |
|---|---|---|
name |
Provider name this breaker guards. |
|
failure_threshold |
Consecutive failures before opening. |
|
cooldown_seconds |
Seconds to wait before half-open test. |
last_failure_time
property
¶
Monotonic timestamp of the last recorded failure, or None.
allow_request
¶
Check whether a request should be allowed through.
Returns:
| Type | Description |
|---|---|
bool
|
True if the circuit is CLOSED or transitioning to HALF_OPEN. |
bool
|
False if the circuit is OPEN (still within cooldown). |
record_success
¶
Record a successful request, resetting the breaker to CLOSED.
health
¶
Return health metrics for monitoring.
Returns:
| Type | Description |
|---|---|
dict[str, object]
|
Dict with state, failure_count, last_failure_time, and config. |
Exceptions¶
almanak.framework.data.DataUnavailableError
¶
Bases: DataSourceError
Raised when required data cannot be obtained from any provider.
Used by the DataRouter when all providers (primary + fallbacks) have failed or been circuit-broken for a given request.
Attributes:
| Name | Type | Description |
|---|---|---|
data_type |
What kind of data was requested (e.g. 'pool_price', 'ohlcv'). |
|
instrument |
Instrument identifier (symbol or address). |
|
reason |
Human-readable explanation. |
almanak.framework.data.MarketSnapshotError
¶
Bases: Exception
Base class for typed snapshot errors.
Subclasses carry structured fields. Stringifying gives a stable shape
for log lines: "<ClassName>(<key>=<value>, …): <reason>".
For backward compat with the legacy data-layer error idiom, positional
arguments are accepted as (chain, reason) or just (reason,);
callers using kwargs (the canonical post-VIB-4062 form) continue to work.