Saltar a contenido

Data Layer

Price oracles, balance providers, OHLCV sources, and the indicator/analytics primitives consumed by MarketSnapshot.

Price Data

PriceOracle

almanak.framework.data.PriceOracle

Bases: Protocol

Protocol for price aggregation and oracle logic.

A PriceOracle wraps one or more BasePriceSource implementations and provides aggregation logic (median, weighted average, etc.) along with outlier detection and graceful degradation.

The oracle is the primary interface for strategies to get price data, abstracting away the complexity of managing multiple sources.

Key responsibilities: - Aggregate prices from multiple sources - Detect and filter outliers (>2% deviation from median) - Handle partial failures (some sources down) - Track source health metrics for routing decisions

Example implementation

class PriceAggregator: def init(self, sources: list[BasePriceSource]): self._sources = sources self._health_metrics: dict[str, SourceHealthMetrics] = {}

async def get_aggregated_price(
    self, token: str, quote: str = "USD", *, chain: str | None = None
) -> PriceResult:
    results = []
    errors = {}

    for source in self._sources:
        try:
            result = await source.get_price(token, quote)
            results.append(result)
        except Exception as e:
            errors[source.source_name] = str(e)

    if not results:
        raise AllDataSourcesFailed(errors)

    # Return median price with aggregated confidence
    median_price = self._calculate_median(results)
    confidence = self._calculate_confidence(results)
    return PriceResult(
        price=median_price,
        source="aggregated",
        timestamp=datetime.now(timezone.utc),
        confidence=confidence,
    )

get_aggregated_price async

get_aggregated_price(
    token: str,
    quote: str = "USD",
    *,
    chain: str | None = None,
) -> PriceResult

Get aggregated price from multiple sources.

Parameters:

Name Type Description Default
token str

Token symbol to get price for

required
quote str

Quote currency (default "USD")

'USD'
chain str | None

Optional chain context for chain-specific or address-based pricing. Single-chain callers may omit this and let the implementation use its default chain, but multi-chain callers should pass it explicitly.

None

Returns:

Type Description
PriceResult

PriceResult with aggregated price and confidence

Raises:

Type Description
AllDataSourcesFailed

If all sources fail to provide data

get_source_health

get_source_health(
    source_name: str,
) -> dict[str, Any] | None

Get health metrics for a specific source.

Returns metrics like success rate, average latency, last error time.

Parameters:

Name Type Description Default
source_name str

Name of the source to query

required

Returns:

Type Description
dict[str, Any] | None

Dictionary with health metrics, or None if source unknown

AggregatedPrice

almanak.framework.data.AggregatedPrice dataclass

AggregatedPrice(
    price: Decimal,
    sources: list[PoolContribution] = list(),
    block_range: tuple[int, int] = (0, 0),
    method: str = "lwap",
    window_seconds: int = 0,
    pool_count: int = 0,
)

Aggregated price from multiple pools or time periods.

Attributes:

Name Type Description
price Decimal

The aggregated price value.

sources list[PoolContribution]

List of pool contributions with individual prices and weights.

block_range tuple[int, int]

Tuple of (min_block, max_block) covered by the aggregation.

method str

Aggregation method used ("twap" or "lwap").

window_seconds int

Time window in seconds (for TWAP).

pool_count int

Number of pools used in aggregation.

PriceAggregator

almanak.framework.data.PriceAggregator

PriceAggregator(
    pool_registry: PoolReaderRegistry,
    rpc_call: RpcCallFn,
    min_liquidity_usd: Decimal = DEFAULT_MIN_LIQUIDITY_USD,
    reference_price_usd: Decimal | None = None,
    source_name: str = "alchemy_rpc",
)

Aggregates prices across pools using TWAP and LWAP methods.

TWAP uses Uniswap V3's built-in oracle (observe()) for time-weighted average prices. LWAP reads live prices from multiple pools and weights them by in-range liquidity.

All results are EXECUTION_GRADE: fail-closed with no off-chain fallback.

Parameters:

Name Type Description Default
pool_registry PoolReaderRegistry

PoolReaderRegistry for reading pool prices.

required
rpc_call RpcCallFn

RPC call function for direct contract reads (observe()).

required
min_liquidity_usd Decimal

Minimum liquidity in USD to include a pool (default $10k).

DEFAULT_MIN_LIQUIDITY_USD
reference_price_usd Decimal | None

Reference price for converting liquidity to USD. If None, liquidity filtering uses raw liquidity values and the threshold is treated as raw liquidity units.

None
source_name str

Provenance label stamped on the aggregated DataMeta.source (default "alchemy_rpc"). When every byte flows through the gateway eth_call proxy, callers MUST pass "gateway_rpc" so the envelope does not carry a false alchemy_rpc provenance (VIB-4924 F2/H3).

'alchemy_rpc'

twap

twap(
    pool_address: str,
    chain: str,
    window_seconds: int = 300,
    token0_decimals: int = 18,
    token1_decimals: int = 6,
    protocol: str = "uniswap_v3",
) -> DataEnvelope[AggregatedPrice]

Calculate TWAP using Uniswap V3 oracle observe().

Calls observe([window_seconds, 0]) on the pool contract to get tick cumulatives, then derives the arithmetic mean tick and converts to a price.

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str

Chain name.

required
window_seconds int

Time window in seconds (default 300 = 5 min).

300
token0_decimals int

Decimals of token0 (default 18).

18
token1_decimals int

Decimals of token1 (default 6).

6
protocol str

Protocol name for source attribution.

'uniswap_v3'

Returns:

Type Description
DataEnvelope[AggregatedPrice]

DataEnvelope[AggregatedPrice] with TWAP price.

Raises:

Type Description
DataUnavailableError

If observe() call fails or returns invalid data.

lwap

lwap(
    token_a: str,
    token_b: str,
    chain: str,
    fee_tiers: list[int] | None = None,
    protocols: list[str] | None = None,
) -> DataEnvelope[AggregatedPrice]

Calculate liquidity-weighted average price across pools.

Reads live prices from all known pools for the given pair, filters out pools below the minimum liquidity threshold, then computes LWAP = sum(price_i * liquidity_i) / sum(liquidity_i).

Falls back to single-pool price if only one pool is available.

Parameters:

Name Type Description Default
token_a str

Token A symbol or address.

required
token_b str

Token B symbol or address.

required
chain str

Chain name.

required
fee_tiers list[int] | None

Fee tiers to search (default: [100, 500, 3000, 10000]).

None
protocols list[str] | None

Protocols to search (default: all registered for chain).

None

Returns:

Type Description
DataEnvelope[AggregatedPrice]

DataEnvelope[AggregatedPrice] with LWAP price.

Raises:

Type Description
DataUnavailableError

If no pools found for the pair (fail-closed).

Balance Data

BalanceProvider

almanak.framework.data.BalanceProvider

Bases: Protocol

Protocol for on-chain balance queries.

A BalanceProvider abstracts the complexity of querying ERC-20 and native token balances from the blockchain, handling decimal conversion, caching, and RPC error recovery.

Key responsibilities: - Query ERC-20 balances via balanceOf - Query native ETH balance via eth_getBalance - Handle token decimal conversion correctly - Cache balances to reduce RPC load - Invalidate cache after transaction execution

Example implementation

class Web3BalanceProvider: def init(self, web3: Web3, wallet_address: str): self._web3 = web3 self._wallet = wallet_address self._cache: dict[str, BalanceResult] = {} self._cache_ttl = 5 # 5 second cache

async def get_balance(self, token: str) -> BalanceResult:
    if token == "ETH":
        return await self._get_native_balance()

    token_info = self._get_token_info(token)
    contract = self._web3.eth.contract(
        address=token_info.address,
        abi=ERC20_ABI,
    )
    raw_balance = contract.functions.balanceOf(self._wallet).call()
    balance = Decimal(raw_balance) / Decimal(10 ** token_info.decimals)

    return BalanceResult(
        balance=balance,
        token=token,
        address=token_info.address,
        decimals=token_info.decimals,
        raw_balance=raw_balance,
    )

def invalidate_cache(self, token: Optional[str] = None) -> None:
    if token:
        self._cache.pop(token, None)
    else:
        self._cache.clear()

get_balance async

get_balance(token: str) -> BalanceResult

Get the balance of a token for the configured wallet.

Parameters:

Name Type Description Default
token str

Token symbol (e.g., "WETH", "USDC") or "ETH" for native

required

Returns:

Type Description
BalanceResult

BalanceResult with balance in human-readable units

Raises:

Type Description
DataSourceError

If balance cannot be fetched

get_native_balance async

get_native_balance() -> BalanceResult

Get the native token balance (ETH, MATIC, etc.).

Convenience method for getting the chain's native token balance.

Returns:

Type Description
BalanceResult

BalanceResult for native token

invalidate_cache

invalidate_cache(token: str | None = None) -> None

Invalidate cached balances.

Should be called after transaction execution to ensure fresh data.

Parameters:

Name Type Description Default
token str | None

Specific token to invalidate, or None to clear all

None

OHLCV Data

OHLCVProvider

almanak.framework.data.OHLCVProvider

Bases: Protocol

Protocol for OHLCV (candlestick) data providers.

Used by indicators like RSI that need historical price data.

Implementations must: - Support multiple timeframes (1m, 5m, 15m, 1h, 4h, 1d) - Return properly typed OHLCVCandle objects - Handle caching to avoid repeated API calls - Gracefully degrade if full history unavailable

The supported_timeframes property should return the subset of VALID_TIMEFRAMES that this provider can supply data for.

supported_timeframes property

supported_timeframes: list[str]

Return the list of timeframes this provider supports.

Returns:

Type Description
list[str]

List of supported timeframe strings (e.g., ["1h", "4h", "1d"])

get_ohlcv async

get_ohlcv(
    token: str,
    quote: str = "USD",
    timeframe: str = "1h",
    limit: int = 100,
) -> list[OHLCVCandle]

Get OHLCV data for a token.

Parameters:

Name Type Description Default
token str

Token symbol

required
quote str

Quote currency

'USD'
timeframe str

Candle timeframe (must be in supported_timeframes)

'1h'
limit int

Number of candles to fetch

100

Returns:

Type Description
list[OHLCVCandle]

List of OHLCVCandle objects sorted by timestamp ascending

Raises:

Type Description
DataSourceError

If data cannot be fetched

InsufficientDataError

If requested limit exceeds available data

ValueError

If timeframe is not supported

OHLCVData

almanak.framework.data.OHLCVData dataclass

OHLCVData(
    timestamp: datetime,
    open: Decimal,
    high: Decimal,
    low: Decimal,
    close: Decimal,
    volume: Decimal | None = None,
)

OHLCV candlestick data point.

Attributes:

Name Type Description
timestamp datetime

Candle open time

open Decimal

Opening price

high Decimal

Highest price

low Decimal

Lowest price

close Decimal

Closing price

volume Decimal | None

Trading volume (optional, CoinGecko OHLC doesn't include volume)

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Pool Analytics

PoolAnalytics

almanak.framework.data.PoolAnalytics dataclass

PoolAnalytics(
    pool_address: str,
    chain: str,
    protocol: str,
    tvl_usd: Decimal,
    volume_24h_usd: Decimal,
    volume_7d_usd: Decimal,
    fee_apr: float,
    fee_apy: float,
    utilization_rate: float | None = None,
    token0_weight: float = 0.5,
    token1_weight: float = 0.5,
    unmeasured_fields: frozenset[str] = frozenset(),
)

Analytics for a single pool.

Attributes:

Name Type Description
pool_address str

Pool contract address.

chain str

Chain name.

protocol str

Protocol name (e.g. "uniswap_v3").

tvl_usd Decimal

Total value locked in USD. For backwards compatibility with the pre-VIB-4727 callers, tvl_usd is non-Optional and an unmeasured TVL surfaces as Decimal("0"). To distinguish measured-zero from unmeasured, inspect unmeasured_fields below (preferred — explicit) or DataMeta.confidence (decays from the baseline 0.85 by ~0.15 per unmeasured field).

volume_24h_usd Decimal

24-hour trading volume in USD.

volume_7d_usd Decimal

7-day trading volume in USD.

fee_apr float

Annualized fee return as a percentage.

fee_apy float

Compounded annual fee return as a percentage.

utilization_rate float | None

Utilization for lending pools (0.0-1.0), None for DEX.

token0_weight float

Fraction of TVL in token0 (0.0-1.0). 0.5 is the balanced default for unmeasured weights; 0.0 from a 0/100 pool is preserved (Empty != Zero).

token1_weight float

See token0_weight.

unmeasured_fields frozenset[str]

Names of money-critical fields the upstream provider did NOT measure (i.e. came back as empty-string on the wire). A field appearing here means the corresponding Decimal("0") / 0.0 value is a placeholder, not a measurement. Used by callers that need to skip pools with unknown TVL rather than treating them as $0 TVL. Money- critical fields tracked: tvl_usd, volume_24h_usd, volume_7d_usd, fee_apr, fee_apy.

PoolAnalyticsReader

VIB-4727: this reader is a thin gRPC client over the gateway's PoolAnalyticsService. It owns no HTTP egress; all upstream provider calls (DefiLlama / CoinGecko Onchain) happen inside the gateway sidecar. See the Market Snapshot HOLD contract for the propagation rule strategy authors must follow.

almanak.framework.data.pools.analytics.PoolAnalyticsReader

PoolAnalyticsReader(
    gateway_client: GatewayClient,
    *,
    timeout_seconds: float = 15.0,
)

Thin gRPC client over the gateway's PoolAnalyticsService.

This class no longer owns any HTTP egress. All upstream provider fetching (DefiLlama, GeckoTerminal) happens inside the gateway sidecar. The constructor REQUIRES a connected GatewayClient — constructing one without it deliberately raises TypeError so any stale PoolAnalyticsReader() call from before VIB-4727 fails loudly.

Parameters:

Name Type Description Default
gateway_client GatewayClient

The connected gateway client.

required
timeout_seconds float

gRPC call timeout (default 15s, matching the gateway's upstream HTTP timeout).

15.0

get_pool_analytics

get_pool_analytics(
    pool_address: str,
    chain: str,
    protocol: str | None = None,
) -> DataEnvelope[PoolAnalytics]

Get real-time analytics for a pool via the gateway.

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str

Chain name (e.g. "arbitrum").

required
protocol str | None

Optional protocol hint (e.g. "uniswap_v3").

None

Returns:

Type Description
DataEnvelope[PoolAnalytics]

DataEnvelope[PoolAnalytics] with INFORMATIONAL classification.

Raises:

Type Description
DataSourceUnavailable

When the gateway returns a non-OK status or when both upstream providers fail. Callers that catch this exception MUST either re-raise or return Intent.hold(...) so the runner's HOLD inference still fires.

best_pool

best_pool(
    token_a: str,
    token_b: str,
    chain: str,
    metric: str = "fee_apr",
    protocols: list[str] | None = None,
) -> DataEnvelope[PoolAnalyticsResult]

Deferred to a follow-up gateway RPC.

The pre-VIB-4727 implementation enumerated all DefiLlama pools and filtered locally — that egress path can't ship as-is from the strategy container. A second gateway RPC (SearchPools) is required. Tracking ticket: VIB-4729.

Until the RPC lands this raises DataSourceUnavailable rather than NotImplementedError so that the wrap-and-reraise in MarketSnapshot.best_pool(...) produces a PoolAnalyticsUnavailableError whose __cause__ chain classifies as DATA_UNAVAILABLE — the runner treats that as HOLD-worthy, the same contract a real provider outage uses. A bare NotImplementedError would have crashed the iteration loop instead of producing a HOLD.

health

health() -> dict[str, dict[str, int]]

Provider health is now owned by the gateway servicer.

The legacy class exposed this for the old aiohttp-using providers. Strategy-container code that wants provider stats should call the gateway's metrics endpoint instead. Returning an empty dict here keeps the attribute non-throwing for any callers that still poll it during the cut-over.

NullPoolAnalyticsReader

Backtest factories (MarketSnapshotBuilder.for_pnl_backtest_state, for_paper_fork) inject this stub. It always raises DataSourceUnavailable("backtest"), forcing strategies inside a backtest to take a deterministic code path (static fee, fixture data, or HOLD).

almanak.framework.data.pools.analytics.NullPoolAnalyticsReader

Always-raises stub used by backtest factories (VIB-4727).

Live gateway HTTP at backtest time = nondeterministic results across runs — strategies that "work in backtest" then silently change behavior in production. The agreed contract (per the VIB-4727 design discussion) is: backtest factories inject this null reader; strategies that depend on pool_analytics(...) must take a deterministic code path inside backtests (a static fee assumption, a fixture-backed analytics, or HOLD).

Any call raises DataSourceUnavailable("backtest") so the runner's HOLD inference path is exercised identically to a real gateway outage.

Pool History

PoolHistoryReader

VIB-4728: thin gRPC client over the gateway's PoolHistoryService. The framework reader owns NO HTTP / GraphQL egress — all upstream provider calls (The Graph subgraphs → DefiLlama → CoinGecko Onchain) happen inside the gateway sidecar; the strategy container holds zero API keys. Returns a DataEnvelope[list[PoolSnapshot]] covering the requested window, with chain-aware canonical address normalization, unmeasured_fields-tagged Empty != Zero semantics on every snapshot, and explicit TruncationReason enum carrying soft-cap / page-cap / provider-retention semantics. See the Market Snapshot HOLD contract for the DataSourceUnavailable propagation rule strategy authors must follow.

almanak.framework.data.pools.history.PoolHistoryReader

PoolHistoryReader(
    gateway_client: GatewayClient,
    *,
    timeout_seconds: float = 30.0,
)

Thin gRPC client over the gateway's PoolHistoryService.

This class no longer owns any HTTP / GraphQL egress. All upstream provider fetching (The Graph, DefiLlama, GeckoTerminal) happens inside the gateway sidecar. The constructor REQUIRES a connected GatewayClient — constructing one without it deliberately raises TypeError so any stale PoolHistoryReader() call from before VIB-4728 fails loudly.

protocol on get_pool_history is keyword-only required (no default) — closes the silent cross-protocol surface flagged by Phase 0b Round-4 of docs/internal/uat-cards/VIB-4755.md §D-2.

Parameters:

Name Type Description Default
gateway_client GatewayClient

The connected gateway client. REQUIRED — a None value raises TypeError.

required
timeout_seconds float

gRPC call timeout (default 30s — longer than PoolAnalyticsReader's 15s because pool history can paginate across multiple upstream calls server-side).

30.0

get_pool_history

get_pool_history(
    pool_address: str,
    chain: str,
    start_date: datetime,
    end_date: datetime | None = None,
    resolution: str = "1h",
    *,
    protocol: str,
) -> DataEnvelope[list[PoolSnapshot]]

Fetch historical pool snapshots via the gateway.

Parameters:

Name Type Description Default
pool_address str

Pool contract address.

required
chain str

Chain name (e.g. "arbitrum", "base").

required
start_date datetime

Start of the history window (UTC).

required
end_date datetime | None

End of the history window. None resolves to datetime.now(UTC) AT THE FRAMEWORK BOUNDARY (per D-2 / D1.S2): a captured request.end_ts shows the resolved Unix-seconds value, NOT 0, so the cache key is stable. MarketSnapshot.pool_history() callers resolve to the snapshot's frozen timestamp instead (deterministic-replay path) — that resolution happens in the accessor before delegating to this reader.

None
resolution str

One of "1h", "4h", "1d".

'1h'
protocol str

REQUIRED keyword-only. Protocol slug — e.g. "uniswap_v3", "aerodrome". The framework reader passes this straight through to the gateway; the validator + dispatcher rejects "" or unknown values with INVALID_ARGUMENT. No default — closes the silent cross-protocol surface (VIB-4755 D-2).

required

Returns:

Type Description
DataEnvelope[list[PoolSnapshot]]

DataEnvelope[list[PoolSnapshot]] with INFORMATIONAL

DataEnvelope[list[PoolSnapshot]]

classification. Money fields are Decimal | None

DataEnvelope[list[PoolSnapshot]]

(None => unmeasured by the serving provider, listed

DataEnvelope[list[PoolSnapshot]]

in snap.unmeasured_fields).

Raises:

Type Description
DataSourceUnavailable

When the gateway returns a non-OK status, the gateway client is not connected, OR the gateway returns success=False. Callers that catch this exception MUST either re-raise or return Intent.hold(...) so the runner's HOLD inference still fires via classify_failure walking __cause__.

health

health() -> dict[str, dict[str, int]]

Provider health is now owned by the gateway servicer.

The legacy class exposed this for the old direct-HTTP providers. Strategy-container code that wants provider stats should call the gateway's metrics endpoint instead. Returning an empty dict keeps the attribute non-throwing for any callers that still poll it during the cut-over. Mirrors the VIB-4727 PoolAnalyticsReader.health() compat shim.

PoolSnapshot

The DTO returned per row by PoolHistoryReader.get_pool_history(...). Money fields (tvl, volume_24h, fee_revenue_24h, token0_reserve, token1_reserve) are typed Decimal | None per the Empty != Zero contract — a None field is named in unmeasured_fields.

almanak.framework.data.pools.history.PoolSnapshot dataclass

PoolSnapshot(
    timestamp: datetime,
    tvl: Decimal | None,
    volume_24h: Decimal | None,
    fee_revenue_24h: Decimal | None,
    token0_reserve: Decimal | None,
    token1_reserve: Decimal | None,
    unmeasured_fields: frozenset[str] = frozenset(),
)

Historical pool state at a specific point in time.

All money / reserve fields are typed Decimal | NoneNone means the upstream provider did NOT measure the field for this row (Empty != Zero). The unmeasured field names are also listed in unmeasured_fields for grep-friendly inspection (if "tvl" in snap.unmeasured_fields:).

The pre-VIB-4728 dataclass shape (non-Optional Decimal with Decimal("0") substituted for unmeasured) was an Empty == Zero bug; VIB-4728 evolves the boundary to honour CLAUDE.md §Accounting "Empty != Zero".

Attributes:

Name Type Description
timestamp datetime

UTC datetime of the snapshot (timezone-aware).

tvl Decimal | None

Total value locked in USD. None if unmeasured by the serving provider for this row.

volume_24h Decimal | None

24-hour trading volume in USD. None if unmeasured.

fee_revenue_24h Decimal | None

24-hour fee revenue in USD. None if unmeasured.

token0_reserve Decimal | None

token0 reserve in human-readable units. None if unmeasured (GeckoTerminal does not report reserves).

token1_reserve Decimal | None

token1 reserve in human-readable units. None if unmeasured.

unmeasured_fields frozenset[str]

Names of fields that are None on this row. Invariant: unmeasured_fields == frozenset(name for name in <money fields> if getattr(self, name) is None).

NullPoolHistoryReader

VIB-4728: the backtest-deterministic stub. MarketSnapshotBuilder.for_pnl_backtest_state and for_paper_fork inject this reader, which always raises DataSourceUnavailable("backtest") so a strategy run inside a backtest cannot make a history-driven decision implicitly. Verified — via three armed monkeypatches on socket.socket.connect, aiohttp.ClientSession, and grpc.aio.{insecure,secure}_channel — to construct ZERO network primitives across the four-class enumeration (in-process network, high-level child-spawn, low-level spawn syscalls, FFI).

almanak.framework.data.null_readers.NullPoolHistoryReader

Always-raises stub used by backtest factories (VIB-4755).

Live gateway HTTP at backtest time = nondeterministic results across runs — strategies that "work in backtest" then silently change behaviour in production. The agreed contract is: backtest factories inject this null reader; strategies that depend on pool_history(...) must take a deterministic code path inside backtests.

Any call raises DataSourceUnavailable("backtest") so the runner's HOLD inference path is exercised identically to a real gateway outage. The class is intentionally a thin shell — NO primitives are constructed in __init__ or anywhere else. This is verified by the D2.M6 38-primitive monkeypatch determinism test (tests/framework/market/test_backtest_pool_history_determinism.py ::test_null_reader_constructs_no_network_primitives).

health

health() -> dict[str, dict[str, int]]

Compat shim — mirrors the live reader's health() shape.

The live reader's health() returns {} (per-provider stats are now server-side). The null reader returns the same empty dict so any caller that polls .health() during the cut-over gets the same non-throwing response in both backtest and live paths.

LiquidityDepth

almanak.framework.data.LiquidityDepth dataclass

LiquidityDepth(
    ticks: list[TickData],
    total_liquidity: int,
    current_tick: int,
    current_price: Decimal,
    pool_address: str = "",
    token0_decimals: int = 18,
    token1_decimals: int = 6,
    tick_spacing: int = 60,
)

Tick-level liquidity distribution for a pool.

Attributes:

Name Type Description
ticks list[TickData]

Initialized ticks sorted by tick_index (ascending).

total_liquidity int

Current in-range liquidity (L) from the pool.

current_tick int

Current active tick from slot0.

current_price Decimal

Current price from slot0.

pool_address str

Pool contract address.

token0_decimals int

Decimals of token0.

token1_decimals int

Decimals of token1.

tick_spacing int

Tick spacing for this pool.

Volatility and Risk

RealizedVolatilityCalculator

almanak.framework.data.RealizedVolatilityCalculator

Computes realized volatility from OHLCV candle data.

Supports two estimators: - close_to_close: Standard deviation of log returns. Simple and widely used, but only uses closing prices. - parkinson: High-low range estimator. More efficient (lower variance) for the same sample size because it uses intra-period range information.

All volatilities are annualized using sqrt(periods_per_year).

realized_vol

realized_vol(
    candles: list[OHLCVCandle],
    window_days: int = 30,
    timeframe: str = "1h",
    estimator: str = "close_to_close",
) -> VolatilityResult

Calculate realized volatility over a lookback window.

Parameters:

Name Type Description Default
candles list[OHLCVCandle]

OHLCV candles sorted ascending by timestamp. Should cover at least window_days of data at the given timeframe.

required
window_days int

Lookback window in calendar days.

30
timeframe str

Candle timeframe (1m, 5m, 15m, 1h, 4h, 1d).

'1h'
estimator str

"close_to_close" (default) or "parkinson".

'close_to_close'

Returns:

Type Description
VolatilityResult

VolatilityResult with annualized, daily, and hourly vol.

Raises:

Type Description
InsufficientDataError

If fewer than 30 observations in the window.

ValueError

If timeframe is unsupported or estimator unknown.

vol_cone

vol_cone(
    candles: list[OHLCVCandle],
    windows: list[int] | None = None,
    timeframe: str = "1h",
    estimator: str = "close_to_close",
    token: str = "",
) -> VolConeResult

Compute volatility cone: current vol vs historical percentile.

For each window length, calculates the current realized vol and compares it to the distribution of rolling volatilities computed over the full candle history.

Parameters:

Name Type Description Default
candles list[OHLCVCandle]

Full OHLCV history sorted ascending. Should be significantly longer than the largest window for meaningful percentile estimation.

required
windows list[int] | None

Lookback windows in days. Default [7, 14, 30, 90].

None
timeframe str

Candle timeframe.

'1h'
estimator str

"close_to_close" or "parkinson".

'close_to_close'
token str

Token symbol for labeling.

''

Returns:

Type Description
VolConeResult

VolConeResult with one VolConeEntry per window.

Raises:

Type Description
InsufficientDataError

If not enough data for the smallest window.

ValueError

If timeframe is unsupported.

PortfolioRiskCalculator

almanak.framework.data.PortfolioRiskCalculator

Computes portfolio risk metrics from PnL or return series.

All calculations use explicit conventions (return interval, risk-free rate, annualization) to ensure results are unambiguous and comparable.

Supports three VaR methods: - parametric: Assumes normal distribution. VaR = -z * sigma * value. - historical: Percentile-based from empirical return distribution. - cornish_fisher: Adjusts z-score for skewness and kurtosis.

portfolio_risk

portfolio_risk(
    pnl_series: list[float],
    total_value_usd: Decimal,
    return_interval: str = "1d",
    risk_free_rate: Decimal = Decimal("0"),
    var_method: VaRMethod = VaRMethod.PARAMETRIC,
    timestamps: list[datetime] | None = None,
    benchmark_eth_returns: list[float] | None = None,
    benchmark_btc_returns: list[float] | None = None,
) -> PortfolioRisk

Calculate portfolio risk metrics from a PnL series.

The pnl_series should contain periodic returns as fractions (e.g. 0.01 = 1% gain, -0.02 = 2% loss).

Parameters:

Name Type Description Default
pnl_series list[float]

List of periodic returns (fractions, not percentages).

required
total_value_usd Decimal

Current portfolio value in USD.

required
return_interval str

Periodicity of the returns (1d, 1h, etc.).

'1d'
risk_free_rate Decimal

Risk-free rate per period as a decimal.

Decimal('0')
var_method VaRMethod

VaR calculation method.

PARAMETRIC
timestamps list[datetime] | None

Optional timestamps for each return (for conventions).

None
benchmark_eth_returns list[float] | None

Optional ETH returns for beta calculation.

None
benchmark_btc_returns list[float] | None

Optional BTC returns for beta calculation.

None

Returns:

Type Description
PortfolioRisk

PortfolioRisk with all metrics and explicit conventions.

Raises:

Type Description
InsufficientDataError

If fewer than 30 observations.

ValueError

If return_interval is unsupported.

rolling_sharpe

rolling_sharpe(
    pnl_series: list[float],
    window_days: int = 30,
    return_interval: str = "1d",
    risk_free_rate: Decimal = Decimal("0"),
    timestamps: list[datetime] | None = None,
) -> RollingSharpeResult

Compute rolling Sharpe ratio over the PnL series.

Parameters:

Name Type Description Default
pnl_series list[float]

List of periodic returns.

required
window_days int

Rolling window in days.

30
return_interval str

Periodicity of the returns.

'1d'
risk_free_rate Decimal

Risk-free rate per period.

Decimal('0')
timestamps list[datetime] | None

Optional timestamps aligned with pnl_series.

None

Returns:

Type Description
RollingSharpeResult

RollingSharpeResult with time series of Sharpe ratios.

Raises:

Type Description
InsufficientDataError

If fewer than 30 observations total.

ValueError

If return_interval is unsupported.

Yield and Rates

YieldAggregator

almanak.framework.data.YieldAggregator

YieldAggregator(
    cache_ttl: int = 900, request_timeout: float = 15.0
)

Cross-protocol yield comparison using DeFi Llama.

Fetches yield data from DeFi Llama yields API, filtering by token, chain, TVL, and protocol. Results are cached for 15 minutes.

Parameters:

Name Type Description Default
cache_ttl int

In-memory cache TTL in seconds. Default 900 (15 minutes).

900
request_timeout float

HTTP request timeout in seconds. Default 15.

15.0

get_yield_opportunities

get_yield_opportunities(
    token: str,
    chains: list[str] | None = None,
    min_tvl: float = 100000,
    sort_by: str = "apy",
) -> DataEnvelope[list[YieldOpportunity]]

Find yield opportunities for a token across protocols and chains.

Parameters:

Name Type Description Default
token str

Token symbol (e.g. "USDC", "WETH").

required
chains list[str] | None

Optional list of chains to filter (e.g. ["arbitrum", "base"]). None means all supported chains.

None
min_tvl float

Minimum TVL in USD. Default $100k.

100000
sort_by str

Sort field: "apy", "tvl", "risk_score". Default "apy".

'apy'

Returns:

Type Description
DataEnvelope[list[YieldOpportunity]]

DataEnvelope[list[YieldOpportunity]] sorted by the chosen metric.

Raises:

Type Description
DataSourceUnavailable

If DeFi Llama API is unavailable.

health

health() -> dict[str, int]

Return health metrics.

Lending rates

Strategies read live lending rates through MarketSnapshot.lending_rate(...) / MarketSnapshot.best_lending_rate(...) — the canonical, gateway-backed accessors. The underlying RateMonitor is a framework-internal gRPC client of the gateway RateHistoryService and is no longer a public strategy API (deprecated for direct use as of VIB-4859 / VIB-4869).

GatewayFundingRateProvider

almanak.framework.data.GatewayFundingRateProvider

GatewayFundingRateProvider(
    gateway_client: GatewayClient,
    chain: str = DEFAULT_CHAIN,
    cache_ttl_seconds: float = DEFAULT_CACHE_TTL_SECONDS,
)

Funding rate provider that delegates to the gateway sidecar.

The gateway owns all network egress (Hyperliquid HTTP, GMX V2 RPC), credential storage, SSL configuration, and rate limiting. Strategy code calls this provider over the in-cluster gRPC channel only.

get_funding_rate async

get_funding_rate(
    venue: Venue | str, market: str
) -> FundingRate

Get the current funding rate for venue/market.

Raises:

Type Description
VenueNotSupportedError

venue is not in SUPPORTED_VENUES.

MarketNotSupportedError

market is not in SUPPORTED_MARKETS[venue].

FundingRateUnavailableError

the gateway returned an error.

get_funding_rate_spread async

get_funding_rate_spread(
    market: str, venue_a: Venue | str, venue_b: Venue | str
) -> FundingRateSpread

Get the funding rate spread between venue_a and venue_b.

Issues a single GetFundingRateSpread RPC so the gateway can fetch both rates concurrently. The signed spread_8h is computed locally from venue_a_rate.rate_hourly - venue_b_rate.rate_hourly because the wire spread_hourly field is absolute by historical convention and we need sign for recommended_direction.

get_rates_for_market async

get_rates_for_market(
    market: str, venues: list[Venue | str] | None = None
) -> dict[str, FundingRate]

Fetch market funding rates across multiple venues concurrently.

Impermanent Loss

ILCalculator

almanak.framework.data.ILCalculator

ILCalculator(
    positions: dict[str, LPPosition] | None = None,
    mock_prices: dict[str, Decimal] | None = None,
)

Calculator for impermanent loss across various AMM pool types.

This class provides methods to calculate impermanent loss for liquidity positions, including: - Standard constant product AMM pools (Uniswap V2, SushiSwap) - Concentrated liquidity pools (Uniswap V3) - Weighted pools (Balancer) - Stable pools (Curve)

The calculator also supports projecting IL for simulated price changes and tracking IL exposure for active positions.

Example

calc = ILCalculator()

Calculate IL for a 50/50 pool

result = calc.calculate_il( entry_price_a=Decimal("2000"), entry_price_b=Decimal("1"), current_price_a=Decimal("2500"), current_price_b=Decimal("1"), ) print(f"IL: {result.il_percent:.4f}%")

Project IL for various price changes

for pct in [10, 25, 50, 100]: proj = calc.project_il(price_change_pct=Decimal(pct)) print(f"Price +{pct}%: IL = {proj.il_percent:.4f}%")

Initialize the IL calculator.

Parameters:

Name Type Description Default
positions dict[str, LPPosition] | None

Optional dictionary of tracked LP positions

None
mock_prices dict[str, Decimal] | None

Optional mock prices for testing (token -> price)

None

calculate_il

calculate_il(
    entry_price_a: Decimal,
    entry_price_b: Decimal,
    current_price_a: Decimal,
    current_price_b: Decimal,
    weight_a: Decimal = Decimal("0.5"),
    weight_b: Decimal = Decimal("0.5"),
    pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
    entry_value: Decimal | None = None,
) -> ILResult

Calculate impermanent loss given entry and current prices.

For constant product AMMs (Uniswap V2 style), IL is calculated as: IL = 2 * sqrt(price_ratio) / (1 + price_ratio) - 1

For weighted pools (Balancer style), IL is calculated using: IL = (price_ratio ^ weight_a) / ((weight_a * price_ratio) + weight_b) - 1

Parameters:

Name Type Description Default
entry_price_a Decimal

Entry price of token A (in quote currency, e.g., USD)

required
entry_price_b Decimal

Entry price of token B (in quote currency)

required
current_price_a Decimal

Current price of token A

required
current_price_b Decimal

Current price of token B

required
weight_a Decimal

Weight of token A (default 0.5 for equal weight)

Decimal('0.5')
weight_b Decimal

Weight of token B (default 0.5 for equal weight)

Decimal('0.5')
pool_type PoolType

Type of AMM pool

CONSTANT_PRODUCT
entry_value Decimal | None

Optional initial position value (for absolute loss calc)

None

Returns:

Type Description
ILResult

ILResult with IL metrics

Raises:

Type Description
InvalidPriceError

If any price is zero or negative

InvalidWeightError

If weights don't sum to 1.0

calculate_il_concentrated

calculate_il_concentrated(
    entry_price_a: Decimal,
    entry_price_b: Decimal,
    current_price_a: Decimal,
    current_price_b: Decimal,
    tick_lower: int,
    tick_upper: int,
    entry_value: Decimal | None = None,
) -> ILResult

Calculate IL for concentrated liquidity positions (Uniswap V3).

Concentrated liquidity amplifies both gains and IL compared to full-range positions. The IL is higher when price moves outside the range.

Parameters:

Name Type Description Default
entry_price_a Decimal

Entry price of token A

required
entry_price_b Decimal

Entry price of token B

required
current_price_a Decimal

Current price of token A

required
current_price_b Decimal

Current price of token B

required
tick_lower int

Lower tick bound of the position

required
tick_upper int

Upper tick bound of the position

required
entry_value Decimal | None

Optional initial position value

None

Returns:

Type Description
ILResult

ILResult with IL metrics for concentrated position

Raises:

Type Description
InvalidPriceError

If any price is zero or negative

project_il

project_il(
    price_change_pct: Decimal,
    weight_a: Decimal = Decimal("0.5"),
    weight_b: Decimal = Decimal("0.5"),
    pool_type: PoolType = PoolType.CONSTANT_PRODUCT,
) -> ProjectedILResult

Project impermanent loss for a given price change.

This method simulates what IL would be if token A's price changed by the specified percentage while token B remains constant (typical for ETH/stablecoin pairs).

Parameters:

Name Type Description Default
price_change_pct Decimal

Price change percentage (e.g., 50 for +50%, -30 for -30%)

required
weight_a Decimal

Weight of token A (default 0.5)

Decimal('0.5')
weight_b Decimal

Weight of token B (default 0.5)

Decimal('0.5')
pool_type PoolType

Type of AMM pool

CONSTANT_PRODUCT

Returns:

Type Description
ProjectedILResult

ProjectedILResult with projected IL metrics

Raises:

Type Description
InvalidWeightError

If weights don't sum to 1.0

InvalidPriceError

If price change would result in negative price

add_position

add_position(position: LPPosition) -> None

Add an LP position for tracking.

Parameters:

Name Type Description Default
position LPPosition

The LP position to track

required

remove_position

remove_position(position_id: str) -> None

Remove an LP position from tracking.

Parameters:

Name Type Description Default
position_id str

ID of the position to remove

required

Raises:

Type Description
PositionNotFoundError

If position doesn't exist

get_position

get_position(position_id: str) -> LPPosition

Get a tracked LP position.

Parameters:

Name Type Description Default
position_id str

ID of the position

required

Returns:

Type Description
LPPosition

The LP position

Raises:

Type Description
PositionNotFoundError

If position doesn't exist

get_all_positions

get_all_positions() -> list[LPPosition]

Get all tracked LP positions.

Returns:

Type Description
list[LPPosition]

List of all tracked positions

calculate_il_exposure

calculate_il_exposure(
    position_id: str,
    current_price_a: Decimal | None = None,
    current_price_b: Decimal | None = None,
    fees_earned: Decimal = Decimal("0"),
) -> ILExposure

Calculate IL exposure for a tracked position.

Parameters:

Name Type Description Default
position_id str

ID of the tracked position

required
current_price_a Decimal | None

Current price of token A (uses mock if not provided)

None
current_price_b Decimal | None

Current price of token B (uses mock if not provided)

None
fees_earned Decimal

Fees earned by the position (optional)

Decimal('0')

Returns:

Type Description
ILExposure

ILExposure with full exposure details

Raises:

Type Description
PositionNotFoundError

If position doesn't exist

ILExposureUnavailableError

If prices cannot be determined

set_mock_prices

set_mock_prices(prices: dict[str, Decimal]) -> None

Set mock prices for testing.

Parameters:

Name Type Description Default
prices dict[str, Decimal]

Dictionary mapping token symbols to prices

required

clear_mock_prices

clear_mock_prices() -> None

Clear mock prices.

Data Routing

DataRouter

almanak.framework.data.DataRouter dataclass

DataRouter(config: DataRoutingConfig = DataRoutingConfig())

Routes data requests to providers with fallback and circuit-breaking.

For EXECUTION_GRADE data types, the router fails closed after the primary provider fails -- no fallback to degraded sources is attempted.

For INFORMATIONAL data types, the router tries the fallback chain in order, each with its own timeout, and returns the best available result with degraded confidence.

Attributes:

Name Type Description
config DataRoutingConfig

Routing configuration specifying provider assignments.

register_provider

register_provider(provider: DataProvider) -> None

Register a data provider for routing.

Parameters:

Name Type Description Default
provider DataProvider

A DataProvider implementation to register.

required

route

route(
    data_type: str,
    *,
    instrument: str = "",
    strategy_config: dict | None = None,
    **fetch_kwargs: object,
) -> DataEnvelope

Select a provider and fetch data with fallback logic.

Provider selection order
  1. Strategy-level override (if strategy_config provided)
  2. Global routing config
  3. Built-in defaults

Parameters:

Name Type Description Default
data_type str

Data type key (e.g. "ohlcv", "pool_price").

required
instrument str

Instrument identifier for logging/error context.

''
strategy_config dict | None

Optional strategy config dict with data_overrides.

None
**fetch_kwargs object

Additional kwargs passed through to provider.fetch().

{}

Returns:

Type Description
DataEnvelope

DataEnvelope from the selected provider.

Raises:

Type Description
DataUnavailableError

When all providers fail.

get_metrics

get_metrics(
    provider_name: str | None = None,
) -> dict[str, object]

Return metrics for a specific provider or all providers.

Parameters:

Name Type Description Default
provider_name str | None

If provided, return metrics for this provider only.

None

Returns:

Type Description
dict[str, object]

Dict of metric data.

health

health() -> dict[str, object]

Return health status for all registered providers.

Returns:

Type Description
dict[str, object]

Dict mapping provider name -> circuit breaker health dict.

CircuitBreaker

almanak.framework.data.CircuitBreaker

CircuitBreaker(
    name: str,
    failure_threshold: int = 5,
    cooldown_seconds: float = 60.0,
)

Thread-safe circuit breaker for a single data provider.

Opens after failure_threshold consecutive failures, then enters a cooldown period. After cooldown, allows one test request (HALF_OPEN). If the test succeeds the circuit closes; if it fails the circuit reopens.

Attributes:

Name Type Description
name

Provider name this breaker guards.

failure_threshold

Consecutive failures before opening.

cooldown_seconds

Seconds to wait before half-open test.

state property

state: CircuitState

Current circuit state, accounting for cooldown expiry.

failure_count property

failure_count: int

Number of consecutive failures.

last_failure_time property

last_failure_time: float | None

Monotonic timestamp of the last recorded failure, or None.

allow_request

allow_request() -> bool

Check whether a request should be allowed through.

Returns:

Type Description
bool

True if the circuit is CLOSED or transitioning to HALF_OPEN.

bool

False if the circuit is OPEN (still within cooldown).

record_success

record_success() -> None

Record a successful request, resetting the breaker to CLOSED.

record_failure

record_failure() -> None

Record a failed request, potentially opening the circuit.

reset

reset() -> None

Manually reset the breaker to CLOSED state.

health

health() -> dict[str, object]

Return health metrics for monitoring.

Returns:

Type Description
dict[str, object]

Dict with state, failure_count, last_failure_time, and config.

Exceptions

almanak.framework.data.DataUnavailableError

DataUnavailableError(
    data_type: str, instrument: str, reason: str
)

Bases: DataSourceError

Raised when required data cannot be obtained from any provider.

Used by the DataRouter when all providers (primary + fallbacks) have failed or been circuit-broken for a given request.

Attributes:

Name Type Description
data_type

What kind of data was requested (e.g. 'pool_price', 'ohlcv').

instrument

Instrument identifier (symbol or address).

reason

Human-readable explanation.

almanak.framework.data.MarketSnapshotError

MarketSnapshotError(
    *args: Any, reason: str = "", **fields: Any
)

Bases: Exception

Base class for typed snapshot errors.

Subclasses carry structured fields. Stringifying gives a stable shape for log lines: "<ClassName>(<key>=<value>, …): <reason>".

For backward compat with the legacy data-layer error idiom, positional arguments are accepted as (chain, reason) or just (reason,); callers using kwargs (the canonical post-VIB-4062 form) continue to work.