Saltar a contenido

Execution

The execution pipeline compiles intents, signs transactions, simulates, and submits them on-chain.

GatewayExecutionOrchestrator

The primary orchestrator used when running with the gateway sidecar.

almanak.framework.execution.GatewayExecutionOrchestrator

GatewayExecutionOrchestrator(
    client: GatewayClient,
    chain: str = DEFAULT_CHAIN,
    wallet_address: str | None = None,
    timeout: float | None = None,
    execute_timeout: float | None = None,
    max_gas_price_gwei: int = 0,
)

ExecutionOrchestrator that executes through the gateway.

This implementation routes all execution requests to the gateway sidecar, which has access to private keys and can sign/submit transactions.

The interface is intentionally simpler than the full ExecutionOrchestrator since the complex signing and submission logic lives in the gateway.

Example

from almanak.framework.gateway_client import GatewayClient from almanak.framework.execution.gateway_orchestrator import GatewayExecutionOrchestrator

with GatewayClient() as client: orchestrator = GatewayExecutionOrchestrator( client=client, chain="arbitrum", wallet_address="0x1234...", ) result = await orchestrator.execute(action_bundle) print(f"Execution success: {result.success}")

Initialize gateway-backed execution orchestrator.

Parameters:

Name Type Description Default
client GatewayClient

Connected GatewayClient instance

required
chain str

Chain name for execution

DEFAULT_CHAIN
wallet_address str | None

Wallet address for signing

None
timeout float | None

gRPC timeout for CompileIntent/GetTransactionStatus calls in seconds. If None, uses chain-specific TX confirmation timeout (300s for Ethereum L1, 120s for L2s).

None
execute_timeout float | None

gRPC timeout for the Execute call in seconds. Must be larger than timeout to account for gas estimation overhead before TX submission. If None, uses chain-specific execute timeout (600s for Ethereum, 300s for L2s). Complex intents like LP_CLOSE can take 100s+ to estimate gas on Anvil forks; this timeout must cover gas estimation + TX confirmation combined.

None
max_gas_price_gwei int

Gas price cap in gwei (0 = use gateway default). Passed to the gateway so the ExecutionOrchestrator enforces the cap.

0

chain property

chain: str

Get the chain name.

wallet_address property

wallet_address: str | None

Get the wallet address.

compile_intent async

compile_intent(
    intent: Any,
    wallet_address: str | None = None,
    price_map: dict[str, str] | None = None,
) -> dict[str, Any]

Compile an intent into an action bundle through gateway.

Parameters:

Name Type Description Default
intent Any

Intent object to compile

required
wallet_address str | None

Override wallet address

None
price_map dict[str, str] | None

Token symbol -> USD price string for real pricing. If provided, the gateway compiler uses these instead of placeholder prices. Values are strings to preserve Decimal precision.

None

Returns:

Type Description
dict[str, Any]

Action bundle as dictionary

Raises:

Type Description
CompilationError

If compilation fails

execute async

execute(
    action_bundle: Any,
    context: Any | None = None,
    deployment_id: str = "",
    intent_id: str = "",
    dry_run: bool = False,
    simulation_enabled: bool = True,
    wallet_address: str | None = None,
) -> GatewayExecutionResult

Execute an action bundle through gateway.

Parameters:

Name Type Description Default
action_bundle Any

Action bundle to execute (object or dict)

required
context Any | None

Optional ExecutionContext for interface compatibility with ExecutionOrchestrator. If provided, extracts deployment_id, intent_id, dry_run, simulation_enabled, and wallet_address from it.

None
deployment_id str

Deployment identifier for tracking

''
intent_id str

Intent identifier for tracking

''
dry_run bool

If True, simulate only without submitting

False
simulation_enabled bool

If True, run simulation before execution

True
wallet_address str | None

Override wallet address

None

Returns:

Type Description
GatewayExecutionResult

GatewayExecutionResult with tx hashes and receipts

Raises:

Type Description
ExecutionError

If execution fails

get_transaction_status async

get_transaction_status(
    tx_hash: str, chain: str | None = None
) -> dict[str, Any]

Get transaction status from gateway.

Parameters:

Name Type Description Default
tx_hash str

Transaction hash to check

required
chain str | None

Chain to query (defaults to orchestrator chain)

None

Returns:

Type Description
dict[str, Any]

Status dictionary with status, confirmations, block_number

ExecutionOrchestrator

almanak.framework.execution.ExecutionOrchestrator

ExecutionOrchestrator(
    signer: Signer,
    submitter: Submitter,
    simulator: Simulator,
    chain: str = DEFAULT_CHAIN,
    rpc_url: str | None = None,
    risk_guard: RiskGuard | None = None,
    event_callback: EventCallback | None = None,
    gas_buffer_multiplier: float | None = None,
    tx_timeout_seconds: float | None = None,
    session_store: ExecutionSessionStore | None = None,
    tx_risk_config: TransactionRiskConfig | None = None,
    registry_preflight: RegistryPreflightCheck
    | None = None,
)

Orchestrates the full transaction execution flow.

The ExecutionOrchestrator coordinates: - RiskGuard validation - Transaction simulation (optional) - Nonce assignment - Transaction signing - Transaction submission - Receipt polling and parsing

Events are emitted at each step for observability.

Example

orchestrator = ExecutionOrchestrator( signer=signer, submitter=submitter, simulator=simulator, chain="arbitrum", )

result = await orchestrator.execute(action_bundle) if result.success: print(f"All transactions confirmed: {result.transaction_results}") else: print(f"Execution failed at {result.error_phase}: {result.error}")

Initialize the orchestrator.

Parameters:

Name Type Description Default
signer Signer

Signer implementation for transaction signing

required
submitter Submitter

Submitter implementation for transaction submission

required
simulator Simulator

Simulator implementation for pre-execution simulation

required
chain str

Target blockchain network

DEFAULT_CHAIN
rpc_url str | None

RPC URL for nonce queries (optional if submitter provides)

None
risk_guard RiskGuard | None

RiskGuard for validation (uses default if not provided)

None
event_callback EventCallback | None

Optional callback for execution events

None
gas_buffer_multiplier float | None

Gas buffer multiplier (uses chain default if not provided)

None
tx_timeout_seconds float | None

Timeout for transaction confirmation. If None, uses chain-specific default (300s for Ethereum L1, 120s for L2s).

None
session_store ExecutionSessionStore | None

Optional ExecutionSessionStore for crash recovery checkpoints

None
tx_risk_config TransactionRiskConfig | None

Transaction risk configuration (uses default if not provided)

None
registry_preflight RegistryPreflightCheck | None

Optional async callback (VIB-4614) that inspects an ActionBundle BEFORE on-chain submission and returns a rejection reason string when an open auto-mode position-registry row would collide with this open (preventing an orphan NFT mint), or None to allow. The orchestrator holds no StateManager / DB handle; the runner injects this callback closed over the StateManager so the _phase_registry_preflight phase stays layering-clean. None disables the phase (no-op) — paper/backtest orchestrators and any caller without a registry-backed StateManager pass None.

None

reset_nonce_cache

reset_nonce_cache(
    wallet_address: str | None = None,
) -> None

Reset the local nonce cache, forcing fresh on-chain query on next execution.

Call this after a nonce-related error to recover from nonce drift.

Parameters:

Name Type Description Default
wallet_address str | None

Specific address to reset. If None, clears all cached nonces.

None

execute async

execute(
    action_bundle: ActionBundle,
    context: ExecutionContext | None = None,
) -> ExecutionResult

Execute an ActionBundle through the full execution pipeline.

This method: 1. Validates transactions via RiskGuard 2. Simulates transactions (if enabled) 3. Assigns sequential nonces 4. Signs all transactions 5. Submits transactions 6. Polls for and parses receipts

Parameters:

Name Type Description Default
action_bundle ActionBundle

The ActionBundle to execute

required
context ExecutionContext | None

Optional execution context

None

Returns:

Type Description
ExecutionResult

ExecutionResult with complete execution details

get_current_nonce async

get_current_nonce(address: str | None = None) -> int

Get the current nonce for an address.

Parameters:

Name Type Description Default
address str | None

Address to query (defaults to signer address)

None

Returns:

Type Description
int

Current nonce for the address

get_gas_price async

get_gas_price() -> dict[str, int]

Get current gas prices from the network.

Returns:

Type Description
dict[str, int]

Dict with max_fee_per_gas and max_priority_fee_per_gas

set_event_callback

set_event_callback(callback: EventCallback | None) -> None

Set the event callback.

Parameters:

Name Type Description Default
callback EventCallback | None

Callback function for execution events

required

ExecutionResult

almanak.framework.execution.ExecutionResult dataclass

ExecutionResult(
    success: bool,
    phase: ExecutionPhase,
    transaction_results: list[TransactionResult] = list(),
    simulation_result: SimulationResult | None = None,
    total_gas_used: int = 0,
    total_gas_cost_wei: int = 0,
    error: str | None = None,
    error_phase: ExecutionPhase | None = None,
    started_at: datetime = (lambda: datetime.now(UTC))(),
    completed_at: datetime | None = None,
    correlation_id: str = "",
    gas_warnings: list[str] = list(),
    position_id: int | str | None = None,
    swap_amounts: SwapAmounts | None = None,
    lp_close_data: LPCloseData | None = None,
    prediction_fill: PredictionFill | None = None,
    bridge_data: BridgeData | None = None,
    bin_ids: list[int] | None = None,
    protocol_fees: ProtocolFees | None = None,
    primitive_money_legs: PrimitiveMoneyLegs | None = None,
    extracted_data: dict[str, Any] = dict(),
    extraction_warnings: list[str] = list(),
)

Complete result of an execution attempt.

Attributes:

Name Type Description
success bool

Whether all transactions succeeded

phase ExecutionPhase

Phase where execution completed or failed

transaction_results list[TransactionResult]

Results for each transaction

simulation_result SimulationResult | None

Simulation result (if simulation was run)

total_gas_used int

Sum of gas used across all transactions

total_gas_cost_wei int

Sum of gas costs across all transactions

error str | None

Error message if failed

error_phase ExecutionPhase | None

Phase where error occurred

started_at datetime

When execution started

completed_at datetime | None

When execution completed

correlation_id str

Unique identifier for this execution

position_id int | str | None

LP position ID for LP_OPEN intents (NFT tokenId int, or pool address str for pool-based protocols like Curve), populated by ResultEnricher

swap_amounts SwapAmounts | None

Swap execution data for SWAP intents

lp_close_data LPCloseData | None

LP close data for LP_CLOSE intents

prediction_fill PredictionFill | None

Polymarket CLOB fill data for PREDICTION_BUY / PREDICTION_SELL intents. Populated by the runner's CLOB branch (VIB-3218). Strategy authors should read result.prediction_fill.filled_shares rather than assuming the intent's requested size filled.

bridge_data BridgeData | None

Typed source-chain deposit data for BRIDGE intents (VIB-3226). Populated by ResultEnricher via the bridge adapter's receipt parser. The destination-chain settlement is tracked asynchronously by EnsoStateProvider; bridge_data.destination_tx_hash is a forward-looking hook and will usually be None here.

bin_ids list[int] | None

TraderJoe V2 bin IDs for LP positions

protocol_fees ProtocolFees | None

Typed ProtocolFees enrichment, when a receipt parser emits it (VIB-159). Top-level slot so strategy callbacks can read result.protocol_fees rather than digging in extracted_data.

primitive_money_legs PrimitiveMoneyLegs | None

Connector-declared PrimitiveMoneyLegs (US-008/ US-009), when a migrated connector emits it (VIB-159). Top-level slot mirrors swap_amounts / lp_close_data; also kept in extracted_data for the legacy ledger dispatcher fallback.

extracted_data dict[str, Any]

Flexible dict for protocol-specific extracted data

extraction_warnings list[str]

Non-fatal warnings from extraction process

effective_price property

effective_price: Decimal | None

Convenience accessor for swap effective price.

slippage_bps property

slippage_bps: int | None

Convenience accessor for swap slippage in basis points.

__post_init__

__post_init__() -> None

Generate correlation_id if not provided.

get_extracted

get_extracted(
    key: str,
    expected_type: type | None = None,
    default: Any = None,
) -> Any

Get extracted data with optional type checking.

Provides safe access to protocol-specific extracted data with optional type validation.

Parameters:

Name Type Description Default
key str

Data key to retrieve from extracted_data

required
expected_type type | None

Optional type to validate against

None
default Any

Default value if key not found or wrong type

None

Returns:

Type Description
Any

Extracted value or default

Example

tick_lower = result.get_extracted("tick_lower", int, 0) liquidity = result.get_extracted("liquidity")

to_outcome

to_outcome() -> ExecutionOutcome

Convert to chain-agnostic ExecutionOutcome.

Returns:

Type Description
ExecutionOutcome

ExecutionOutcome with EVM-specific fields mapped to common shape.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

ExecutionContext

almanak.framework.execution.ExecutionContext dataclass

ExecutionContext(
    deployment_id: str = "unknown",
    intent_id: str = "",
    chain: str = DEFAULT_CHAIN,
    wallet_address: str = "",
    correlation_id: str = "",
    cycle_id: str = "",
    session_id: str = "",
    simulation_enabled: bool = False,
    intent_description: str = "",
    dry_run: bool = False,
    protocol: str | None = None,
)

Context for the current execution.

Attributes:

Name Type Description
deployment_id str

Deployment identifier

intent_id str

Intent identifier (for session tracking)

chain str

Blockchain network

wallet_address str

Address executing transactions

correlation_id str

Unique identifier for this execution

session_id str

Execution session identifier (for crash recovery)

simulation_enabled bool

Whether to simulate before execution

dry_run bool

If True, don't actually submit transactions

__post_init__

__post_init__() -> None

Generate correlation_id if not provided.

Result Enrichment

After successful execution, ResultEnricher automatically extracts data from transaction receipts (position IDs, swap amounts, etc.) and attaches it to the result.

ResultEnricher

almanak.framework.execution.ResultEnricher

ResultEnricher(
    parser_registry: ReceiptParserRegistry | None = None,
    *,
    live_mode: bool = True,
    pool_key_lookup: Any = None,
)

Enriches ExecutionResult with intent-specific extracted data.

This component implements the "Framework Orchestrates, Protocols Execute" pattern. It determines WHAT to extract based on intent type, and delegates HOW to extract to protocol-specific parsers.

Key Design Principles: 1. Fail-Closed (live): Parse errors raise CriticalAccountingError so the runner cannot proceed on a stale / ghost view of on-chain state. Paper / backtest callers opt into permissive mode via live_mode=False, which downgrades ExtractError to a structured warning + counter. "No event of this type" results (ExtractMissing) are benign in both modes and never raise. 2. Type-Safe: Core fields are strongly typed. 3. Extensible: New protocols can be added without framework changes. 4. Zero Cognitive Load: Data "just appears" on result. 5. Three-variant contract: migrated parsers return ExtractOk / ExtractMissing / ExtractError so "no event" and "parse error" are distinguishable. Legacy parsers keep working via backward-compat wrapping (see _legacy_warn / _invoke_extract).

Example

enricher = ResultEnricher()

In StrategyRunner after execution:

result = await orchestrator.execute(bundle) if result.success: result = enricher.enrich(result, intent, context)

Strategy callback receives enriched result:

strategy.on_intent_executed(intent, success=True, result=result)

Strategy can use result.position_id directly!

Initialize the ResultEnricher.

Parameters:

Name Type Description Default
parser_registry ReceiptParserRegistry | None

Registry for protocol parsers. If not provided, uses the default global registry.

None
live_mode bool

When True (default), an ExtractError from a parser is converted into CriticalAccountingError and raised — accounting failures must not be silently treated as "no event". When False (paper / backtest), the error is logged and counted on result.extraction_warnings but does not halt execution. Default True is a deliberate fail-closed choice — paper trading entry points must opt into permissive mode.

True
pool_key_lookup Any

VIB-4477 (T08). Sync (pool_id_hex, chain) -> PoolKey | None callable injected into the Uniswap V4 receipt parser so extract_lp_close_data can resolve V4 ModifyLiquidity.pool_id back to its canonical PoolKey via the gateway. None (default) skips the wiring — V4 LP_CLOSE events then drop with a structured missing_pool_key_lookup warning (Empty != Zero per blueprint 27, the parser fails loud rather than misattribute). The strategy runner builds this from connector-owned runner hooks bound to its GatewayClient.

None

enrich

enrich(
    result: ExecutionResult,
    intent: Any,
    context: ExecutionContext,
    *,
    bundle_metadata: dict[str, Any] | None = None,
) -> ExecutionResult

Enrich execution result with intent-specific extracted data.

This method extracts relevant data from transaction receipts based on the intent type and attaches it to the ExecutionResult.

IMPORTANT (VIB-3159): In live mode this method FAILS CLOSED. Parsers that return ExtractError — or raise — cause CriticalAccountingError to propagate. Paper / backtest callers must construct the enricher with live_mode=False to downgrade those errors to warnings + a counter. Benign "no event of this type" results (ExtractMissing) never raise in either mode.

Parameters:

Name Type Description Default
result ExecutionResult

Raw execution result from orchestrator

required
intent Any

The intent that was executed

required
context ExecutionContext

Execution context with chain info

required
bundle_metadata dict[str, Any] | None

Optional ActionBundle.metadata dict from the compiler. Used to thread compiler-side quote data (e.g., expected_output_human for VIB-3203 realized-slippage calculation) through to the extract_* methods.

None

Returns:

Type Description
ExecutionResult

Enriched ExecutionResult (same instance, mutated)

Raises:

Type Description
CriticalAccountingError

when live_mode is True and a parser returns ExtractError (or raises). Inherits from Exception so the strategy runner's recovery path in run_iteration can catch it and return ACCOUNTING_FAILED (VIB-3180).

Example

result = enricher.enrich(result, intent, context)

result.position_id is now populated (if LP_OPEN)

result.swap_amounts is now populated (if SWAP)

SwapAmounts

almanak.framework.execution.SwapAmounts dataclass

SwapAmounts(
    amount_in: int,
    amount_out: int,
    amount_in_decimal: Decimal | None,
    amount_out_decimal: Decimal | None,
    effective_price: Decimal | None = None,
    slippage_bps: int | None = None,
    expected_out_decimal: Decimal | None = None,
    token_in: str | None = None,
    token_out: str | None = None,
    amount_in_decimal_resolved: bool = True,
    amount_out_decimal_resolved: bool = True,
    slippage_source: SlippageSource = SlippageSource.NONE,
)

Extracted swap execution data.

Represents the token amounts exchanged in a swap transaction. All fields are immutable (frozen=True) for safety.

Attributes:

Name Type Description
amount_in int

Raw input amount (in token's smallest unit)

amount_out int

Raw output amount (in token's smallest unit)

amount_in_decimal Decimal | None

Human-readable input amount, or None when the parser could not resolve token_in decimals. Per the "Empty != zero" invariant in docs/internal/blueprints/27-accounting.md: Decimal(0) is a measured zero, None is unmeasured. Never substitute one for the other.

amount_out_decimal Decimal | None

Human-readable output amount, or None when unmeasured (parsers fail-close on output decimals, so this is populated for every successful parse today; typed as optional for consistency with amount_in_decimal).

effective_price Decimal | None

Actual execution price (out/in), or None when unmeasurable (e.g. unresolved input decimals). Per the "Empty != zero" invariant a literal Decimal(0) here would silently corrupt slippage / lot-pricing reconciliation.

slippage_bps int | None

Actual slippage in basis points (None if unknown)

expected_out_decimal Decimal | None

Pre-slippage-discount expected output in human units, sourced from the compiler's ActionBundle metadata (VIB-3203). Persisting this alongside slippage_bps gives downstream consumers the source-of-truth used to compute the realized slippage. None when the compile path did not supply a quote.

token_in str | None

Input token address or symbol

token_out str | None

Output token address or symbol

amount_in_decimal_resolved bool

True when amount_in_decimal was computed from a resolved decimals value on the token resolver. False means the parser could not resolve decimals for token_in; amount_in_decimal is then either None OR a legacy 18-decimal estimate (uniswap_v3 fallback, VIB-3164) — downstream consumers MUST gate on this flag rather than the value itself (issue #1778, "Empty != zero" invariant). Defaults to True so existing parsers that do not populate the flag continue to behave as before.

amount_out_decimal_resolved bool

Analogous flag for amount_out_decimal. See amount_in_decimal_resolved.

Example

if result.swap_amounts: price = result.swap_amounts.effective_price slippage = result.swap_amounts.slippage_bps

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

LPCloseData

almanak.framework.execution.LPCloseData dataclass

LPCloseData(
    amount0_collected: int | None = None,
    amount1_collected: int | None = None,
    fees0: int | None = None,
    fees1: int | None = None,
    liquidity_removed: int | None = None,
    additional_amounts: dict[int, int] | None = None,
    additional_fees: dict[int, int] | None = None,
    current_tick: int | None = None,
    pool_address: str = "",
    source: str | None = None,
    currency0: str | None = None,
    currency1: str | None = None,
    position_id: str | None = None,
    fee_separation_method: str = "UNKNOWN",
    fee_confidence: str = "UNKNOWN",
)

Extracted LP close execution data.

Represents the amounts collected when closing an LP position, including principal and fees.

Attributes:

Name Type Description
amount0_collected int | None

Total amount of token0 collected (principal + fees). None = UNMEASURED (Empty ≠ Zero): a NATIVE-ETH leg credited via an internal call emits no ERC-20 Transfer, so a log-based parser cannot measure it from the receipt and leaves it None for the runner's native-balance-bracket capture to fill at ledger-build time (VIB-5121). A numeric value — including 0 — is a measured leg.

amount1_collected int | None

Total amount of token1 collected (principal + fees). None = UNMEASURED native leg (see amount0_collected).

fees0 int | None

Fees earned in token0 (if separately tracked)

fees1 int | None

Fees earned in token1 (if separately tracked)

liquidity_removed int | None

Amount of liquidity removed (if available)

additional_amounts dict[int, int] | None

Amounts for coins beyond token0/token1 (e.g., Curve 3/4-coin pools). Maps coin index to raw amount: {2: 50000000, 3: 91000000000000000000}.

additional_fees dict[int, int] | None

Fees for coins beyond token0/token1. Maps coin index to fee amount: {2: 100000, 3: 0}.

current_tick int | None

Pool's current tick at the moment of close (VIB-3940). Mirrors LPOpenData.current_tick so the framework can derive in_range at close-time and stamp it on the LP_CLOSE event, closing the lane-symmetry gap with LP_OPEN. Sourced from a Swap event in the close receipt when present, with a slot0() RPC fallback in the runner. None when no Swap event is in the receipt and the slot0 fallback could not run.

pool_address str

V3 pool address that emitted the Burn event (VIB-3940). Required input for the framework's slot0 fallback. Empty when the parser couldn't identify the pool. Mirrors LPOpenData.pool_address.

source str | None

Provenance tag for the on-chain event the close amounts were decoded from. "collect" = sourced from a Collect event (principal + already-accrued fees, the truth on transfer); "decrease_liquidity" = sourced from a DecreaseLiquidity event (principal unlocked into tokensOwed but not yet transferred). For protocols whose close is a two-tx sequence (Aerodrome Slipstream: decreaseLiquidity then collect), ResultEnricher uses this tag to prefer the Collect-sourced extraction across receipts so accrued fees on chain are not silently dropped from the registry payload (VIB-4310). Optional for backward compatibility: single-tx parsers may leave it None; the enricher then falls back to first-match semantics.

Example

if result.lp_close_data: total_0 = result.lp_close_data.amount0_collected fees_0 = result.lp_close_data.fees0 # For 4-coin pools (e.g., Curve NG): all_amounts = result.lp_close_data.all_amounts # [amt0, amt1, amt2, amt3]

all_amounts property

all_amounts: list[int | None]

Return all coin amounts as a list, including additional coins.

Per Empty ≠ Zero (VIB-5117 / VIB-5121): None slots stand for "unmeasured by this parser" — e.g. a native principal leg the receipt could not observe (filled later from the V4 pre-burn position-state read or the Fluid balance bracket). A numeric 0 is a measured zero. Mirrors :attr:all_fees.

all_fees property

all_fees: list[int | None]

Return all fee amounts as a list, including additional coins.

Per Empty ≠ Zero: None slots stand for "unmeasured by this parser".

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

Signers

LocalKeySigner

almanak.framework.execution.LocalKeySigner

LocalKeySigner(private_key: str)

Bases: Signer

Signs transactions using a local private key.

This signer uses the eth-account library to sign transactions locally. The private key is stored in memory and derived to an Ethereum account at initialization time.

SECURITY CONTRACT
  • Private keys are NEVER logged, printed, or included in error messages
  • Private keys are NEVER exposed through any method or property
  • Transaction fields are validated before signing
  • The wallet address is derived from the key at initialization
Supported transaction types
  • EIP-1559 (Type 2): Modern fee market transactions
  • Legacy (Type 0): Pre-EIP-1559 transactions

Attributes:

Name Type Description
address str

The checksummed Ethereum address derived from the private key

Example

Initialize with private key

signer = LocalKeySigner(private_key="0x...")

Create unsigned transaction

tx = UnsignedTransaction( to="0x1234...", value=0, data="0xa9059cbb...", chain_id=42161, gas_limit=100000, nonce=5, max_fee_per_gas=100_000_000, max_priority_fee_per_gas=1_000_000, )

Sign transaction

signed = await signer.sign(tx, chain="arbitrum") print(f"Signed tx hash: {signed.tx_hash}")

Initialize the signer with a private key.

Parameters:

Name Type Description Default
private_key str

Hex-encoded private key (with or without 0x prefix)

required

Raises:

Type Description
SigningError

If the private key is invalid

Example

signer = LocalKeySigner(private_key="0xabc123...")

address property

address: str

Return the wallet address associated with this signer.

The address is derived from the private key at initialization and is cached for efficiency.

Returns:

Type Description
str

Checksummed Ethereum address (0x-prefixed, 42 characters)

Example

signer = LocalKeySigner(private_key="0x...") print(signer.address) # 0x71C7656EC7ab88b098defB751B7401B5f6d8976F

sign async

sign(
    tx: UnsignedTransaction, chain: str
) -> SignedTransaction

Sign a transaction with the local private key.

This method validates the transaction fields, builds the appropriate transaction dictionary based on transaction type (EIP-1559 or legacy), and signs it using the eth-account library.

Parameters:

Name Type Description Default
tx UnsignedTransaction

Unsigned transaction to sign

required
chain str

Chain name (e.g., "arbitrum", "ethereum") for logging/validation

required

Returns:

Type Description
SignedTransaction

SignedTransaction containing the raw signed bytes and transaction hash

Raises:

Type Description
SigningError

If signing fails due to invalid fields or key issues

ValueError

If transaction fields are malformed

Example

tx = UnsignedTransaction( to="0x1234...", value=1_000_000_000_000_000_000, # 1 ETH data="0x", chain_id=1, gas_limit=21000, nonce=0, max_fee_per_gas=30_000_000_000, max_priority_fee_per_gas=1_000_000_000, ) signed = await signer.sign(tx, chain="ethereum")

__repr__

__repr__() -> str

Return string representation without exposing private key.

__str__

__str__() -> str

Return string representation without exposing private key.

Simulators

DirectSimulator

almanak.framework.execution.DirectSimulator

DirectSimulator(name: str = 'direct')

Bases: Simulator

Pass-through simulator that skips actual simulation.

This simulator returns a successful SimulationResult without performing any actual simulation. It is designed for environments where simulation is not needed, such as local fork testing or trusted execution contexts.

The key differentiator from other simulators: - Returns simulated=False to indicate no simulation was performed - Always returns success=True (assumes transactions are valid) - Logs that simulation was skipped for observability

This is NOT a stub or shortcut - it is a legitimate implementation for production use cases where pre-execution simulation adds no value or where the latency cost is unacceptable.

Future Alternatives: - TenderlySimulator: Full simulation via Tenderly API - LocalSimulator: Simulation via local node eth_call - FlashbotsSimulator: Simulation via Flashbots bundle simulation

Attributes:

Name Type Description
name str

Identifier for this simulator (for logging and metrics)

Example

simulator = DirectSimulator()

Simulate a single transaction

result = await simulator.simulate([tx], chain="arbitrum")

if result.success: # Proceed to signing and submission ... else: # Handle simulation failure (won't happen with DirectSimulator) ...

Initialize the DirectSimulator.

Parameters:

Name Type Description Default
name str

Identifier for this simulator instance (default: "direct")

'direct'

name property

name: str

Return the simulator name.

simulate async

simulate(
    txs: list[UnsignedTransaction],
    chain: str,
    state_overrides: dict[str, Any] | None = None,
) -> SimulationResult

Return a pass-through simulation result without actual simulation.

This method logs that simulation was skipped and returns a successful SimulationResult with simulated=False to indicate no actual simulation was performed.

Parameters:

Name Type Description Default
txs list[UnsignedTransaction]

List of unsigned transactions to "simulate"

required
chain str

Chain name (logged but not used for simulation)

required
state_overrides dict[str, Any] | None

Ignored - DirectSimulator doesn't perform simulation

None

Returns:

Type Description
SimulationResult

SimulationResult with:

SimulationResult
  • success=True (assumes transactions are valid)
SimulationResult
  • simulated=False (indicates no simulation was performed)
SimulationResult
  • Empty gas_estimates, warnings, state_changes, logs
Note

This method never raises exceptions for valid input. Infrastructure failures are not possible since no external calls are made.

TenderlySimulator

almanak.framework.execution.TenderlySimulator

TenderlySimulator(
    account_slug: str,
    project_slug: str,
    access_key: str,
    timeout_seconds: float = 10.0,
    name: str = "tenderly",
    ssl_context: SSLContext | None = None,
)

Bases: Simulator

Transaction simulation via Tenderly REST API.

This simulator uses Tenderly's simulate-bundle endpoint to simulate transactions before submission. It provides accurate gas estimates, pre-flight validation, and supports SAFE wallet simulations via state overrides.

Key Advantages over Alchemy
  • No transaction bundle limit (Alchemy limited to 3)
  • State override support for SAFE wallet ETH balance
  • Support for more chains (9+ vs 4 for Alchemy)
  • Detailed simulation dashboard URLs

Attributes:

Name Type Description
account_slug

Tenderly account identifier

project_slug

Tenderly project identifier

timeout_seconds

Request timeout

Example

simulator = TenderlySimulator( account_slug="my-account", project_slug="my-project", access_key="xxx", )

Basic simulation

result = await simulator.simulate([tx], chain="arbitrum")

SAFE wallet simulation with ETH balance override

result = await simulator.simulate( [tx], chain="arbitrum", state_overrides={"0xSafeAddress": {"balance": hex(10 * 10**18)}}, )

Initialize the TenderlySimulator.

Parameters:

Name Type Description Default
account_slug str

Tenderly account slug

required
project_slug str

Tenderly project slug

required
access_key str

Tenderly API access key

required
timeout_seconds float

Request timeout (default 10s)

10.0
name str

Simulator name for logging (default "tenderly")

'tenderly'
ssl_context SSLContext | None

Optional SSL context for outbound HTTPS. When provided (e.g. the gateway threads in its certifi-backed context), the aiohttp session uses a TCPConnector(ssl=ssl_context). When None (operator CLI / other framework callers), a bare aiohttp.ClientSession() is used, preserving prior behaviour.

None

Raises:

Type Description
ValueError

If any required parameter is missing

name property

name: str

Return the simulator name.

supports_chain

supports_chain(chain: str) -> bool

Check if this simulator supports a given chain.

Parameters:

Name Type Description Default
chain str

Chain name (lowercase)

required

Returns:

Type Description
bool

True if Tenderly supports this chain

simulate async

simulate(
    txs: list[UnsignedTransaction],
    chain: str,
    state_overrides: dict[str, Any] | None = None,
) -> SimulationResult

Simulate transactions via Tenderly API.

This method simulates the execution of one or more transactions using Tenderly's bundle simulation endpoint. It returns gas estimates and validates that transactions will succeed.

Parameters:

Name Type Description Default
txs list[UnsignedTransaction]

List of unsigned transactions to simulate

required
chain str

Chain name (e.g., "arbitrum")

required
state_overrides dict[str, Any] | None

Optional state overrides for SAFE wallets Format: {"0xAddress": {"balance": "0xHexWei"}}

None

Returns:

Type Description
SimulationResult

SimulationResult with gas estimates and success status

Raises:

Type Description
SimulationError

For infrastructure failures (not tx failures)

Receipt Parsing

ReceiptParserRegistry

almanak.framework.execution.ReceiptParserRegistry

ReceiptParserRegistry()

Façade over STRATEGY_RECEIPT_PARSER_REGISTRY.

Adds the caching + custom-registration semantics consumers rely on on top of the strategy-side connector registry's classes_by_key() resolution:

  • Per-protocol cacheget(protocol) (no kwargs) caches the instantiated parser. register(custom) evicts that entry.
  • Custom registrationregister(protocol, cls) injects a user-supplied parser class that takes precedence over the connector-provided one. Used by tests and by strategy-side consumers that need to substitute a parser at runtime.
  • Alias normalisationprotocol is run through :func:almanak.connectors._strategy_base.protocol_aliases.normalize_protocol before lookup, so chain-scoped renames (e.g. ("mantle", "uniswap_v3")"agni_finance") resolve identically to the pre-W2 behaviour.

The class is intentionally instantiable so each consumer can hold its own cache scope (ResultEnricher historically constructs one per execution context). The module-level convenience functions (get_parser / list_parsers / …) share a single default instance.

get

get(protocol: str, **kwargs: Any) -> ReceiptParser

Resolve protocol to a parser instance.

See class docstring for cache / alias semantics. Raises :class:ValueError if the protocol isn't known.

register

register(
    protocol: str, parser_class: type[ReceiptParser]
) -> None

Register a custom parser class. Evicts the cached instance.

unregister

unregister(protocol: str) -> bool

Remove a custom parser registration. Returns True if removed.

clear_cache

clear_cache() -> None

Drop every cached parser instance (custom classes survive).

list_protocols

list_protocols() -> list[str]

Return every protocol key resolvable by this registry.

is_registered

is_registered(protocol: str) -> bool

Whether protocol resolves to a parser (built-in or custom).

Exceptions

almanak.framework.execution.ExecutionError

Bases: Exception

Base exception for execution layer errors.

All execution-related exceptions inherit from this class to allow broad exception handling when needed.

almanak.framework.execution.SimulationError

SimulationError(reason: str, recoverable: bool = True)

Bases: ExecutionError

Raised when transaction simulation fails.

This is distinct from a transaction that simulates successfully but would revert - that returns SimulationResult with success=False.

This exception is for infrastructure failures: - Simulation service unavailable - Network timeout - Invalid simulation parameters

Attributes:

Name Type Description
reason

Human-readable explanation of the failure

recoverable

Whether the error is transient and can be retried

almanak.framework.execution.SigningError

SigningError(reason: str, tx_hash: str | None = None)

Bases: ExecutionError

Raised when transaction signing fails.

This exception should be raised when: - Private key is invalid or missing - Transaction fields are malformed - Signing algorithm encounters an error

Note: Never include sensitive key material in error messages.

Attributes:

Name Type Description
reason

Human-readable explanation of the failure

tx_hash

Optional hash of the transaction that failed (if available)