Execution¶
The execution pipeline compiles intents, signs transactions, simulates, and submits them on-chain.
GatewayExecutionOrchestrator¶
The primary orchestrator used when running with the gateway sidecar.
almanak.framework.execution.GatewayExecutionOrchestrator
¶
GatewayExecutionOrchestrator(
client: GatewayClient,
chain: str = DEFAULT_CHAIN,
wallet_address: str | None = None,
timeout: float | None = None,
execute_timeout: float | None = None,
max_gas_price_gwei: int = 0,
)
ExecutionOrchestrator that executes through the gateway.
This implementation routes all execution requests to the gateway sidecar, which has access to private keys and can sign/submit transactions.
The interface is intentionally simpler than the full ExecutionOrchestrator since the complex signing and submission logic lives in the gateway.
Example
from almanak.framework.gateway_client import GatewayClient from almanak.framework.execution.gateway_orchestrator import GatewayExecutionOrchestrator
with GatewayClient() as client: orchestrator = GatewayExecutionOrchestrator( client=client, chain="arbitrum", wallet_address="0x1234...", ) result = await orchestrator.execute(action_bundle) print(f"Execution success: {result.success}")
Initialize gateway-backed execution orchestrator.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
client
|
GatewayClient
|
Connected GatewayClient instance |
必需 |
chain
|
str
|
Chain name for execution |
DEFAULT_CHAIN
|
wallet_address
|
str | None
|
Wallet address for signing |
None
|
timeout
|
float | None
|
gRPC timeout for CompileIntent/GetTransactionStatus calls in seconds. If None, uses chain-specific TX confirmation timeout (300s for Ethereum L1, 120s for L2s). |
None
|
execute_timeout
|
float | None
|
gRPC timeout for the Execute call in seconds. Must be
larger than |
None
|
max_gas_price_gwei
|
int
|
Gas price cap in gwei (0 = use gateway default). Passed to the gateway so the ExecutionOrchestrator enforces the cap. |
0
|
compile_intent
async
¶
compile_intent(
intent: Any,
wallet_address: str | None = None,
price_map: dict[str, str] | None = None,
) -> dict[str, Any]
Compile an intent into an action bundle through gateway.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
intent
|
Any
|
Intent object to compile |
必需 |
wallet_address
|
str | None
|
Override wallet address |
None
|
price_map
|
dict[str, str] | None
|
Token symbol -> USD price string for real pricing. If provided, the gateway compiler uses these instead of placeholder prices. Values are strings to preserve Decimal precision. |
None
|
返回:
| 类型 | 描述 |
|---|---|
dict[str, Any]
|
Action bundle as dictionary |
引发:
| 类型 | 描述 |
|---|---|
CompilationError
|
If compilation fails |
execute
async
¶
execute(
action_bundle: Any,
context: Any | None = None,
deployment_id: str = "",
intent_id: str = "",
dry_run: bool = False,
simulation_enabled: bool = True,
wallet_address: str | None = None,
) -> GatewayExecutionResult
Execute an action bundle through gateway.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
action_bundle
|
Any
|
Action bundle to execute (object or dict) |
必需 |
context
|
Any | None
|
Optional ExecutionContext for interface compatibility with ExecutionOrchestrator. If provided, extracts deployment_id, intent_id, dry_run, simulation_enabled, and wallet_address from it. |
None
|
deployment_id
|
str
|
Deployment identifier for tracking |
''
|
intent_id
|
str
|
Intent identifier for tracking |
''
|
dry_run
|
bool
|
If True, simulate only without submitting |
False
|
simulation_enabled
|
bool
|
If True, run simulation before execution |
True
|
wallet_address
|
str | None
|
Override wallet address |
None
|
返回:
| 类型 | 描述 |
|---|---|
GatewayExecutionResult
|
GatewayExecutionResult with tx hashes and receipts |
引发:
| 类型 | 描述 |
|---|---|
ExecutionError
|
If execution fails |
get_transaction_status
async
¶
Get transaction status from gateway.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
tx_hash
|
str
|
Transaction hash to check |
必需 |
chain
|
str | None
|
Chain to query (defaults to orchestrator chain) |
None
|
返回:
| 类型 | 描述 |
|---|---|
dict[str, Any]
|
Status dictionary with status, confirmations, block_number |
ExecutionOrchestrator¶
almanak.framework.execution.ExecutionOrchestrator
¶
ExecutionOrchestrator(
signer: Signer,
submitter: Submitter,
simulator: Simulator,
chain: str = DEFAULT_CHAIN,
rpc_url: str | None = None,
risk_guard: RiskGuard | None = None,
event_callback: EventCallback | None = None,
gas_buffer_multiplier: float | None = None,
tx_timeout_seconds: float | None = None,
session_store: ExecutionSessionStore | None = None,
tx_risk_config: TransactionRiskConfig | None = None,
registry_preflight: RegistryPreflightCheck
| None = None,
)
Orchestrates the full transaction execution flow.
The ExecutionOrchestrator coordinates: - RiskGuard validation - Transaction simulation (optional) - Nonce assignment - Transaction signing - Transaction submission - Receipt polling and parsing
Events are emitted at each step for observability.
Example
orchestrator = ExecutionOrchestrator( signer=signer, submitter=submitter, simulator=simulator, chain="arbitrum", )
result = await orchestrator.execute(action_bundle) if result.success: print(f"All transactions confirmed: {result.transaction_results}") else: print(f"Execution failed at {result.error_phase}: {result.error}")
Initialize the orchestrator.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
signer
|
Signer
|
Signer implementation for transaction signing |
必需 |
submitter
|
Submitter
|
Submitter implementation for transaction submission |
必需 |
simulator
|
Simulator
|
Simulator implementation for pre-execution simulation |
必需 |
chain
|
str
|
Target blockchain network |
DEFAULT_CHAIN
|
rpc_url
|
str | None
|
RPC URL for nonce queries (optional if submitter provides) |
None
|
risk_guard
|
RiskGuard | None
|
RiskGuard for validation (uses default if not provided) |
None
|
event_callback
|
EventCallback | None
|
Optional callback for execution events |
None
|
gas_buffer_multiplier
|
float | None
|
Gas buffer multiplier (uses chain default if not provided) |
None
|
tx_timeout_seconds
|
float | None
|
Timeout for transaction confirmation. If None, uses chain-specific default (300s for Ethereum L1, 120s for L2s). |
None
|
session_store
|
ExecutionSessionStore | None
|
Optional ExecutionSessionStore for crash recovery checkpoints |
None
|
tx_risk_config
|
TransactionRiskConfig | None
|
Transaction risk configuration (uses default if not provided) |
None
|
registry_preflight
|
RegistryPreflightCheck | None
|
Optional async callback (VIB-4614) that inspects an
ActionBundle BEFORE on-chain submission and returns a rejection
reason string when an open auto-mode position-registry row would
collide with this open (preventing an orphan NFT mint), or |
None
|
reset_nonce_cache
¶
Reset the local nonce cache, forcing fresh on-chain query on next execution.
Call this after a nonce-related error to recover from nonce drift.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
wallet_address
|
str | None
|
Specific address to reset. If None, clears all cached nonces. |
None
|
execute
async
¶
Execute an ActionBundle through the full execution pipeline.
This method: 1. Validates transactions via RiskGuard 2. Simulates transactions (if enabled) 3. Assigns sequential nonces 4. Signs all transactions 5. Submits transactions 6. Polls for and parses receipts
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
action_bundle
|
ActionBundle
|
The ActionBundle to execute |
必需 |
context
|
ExecutionContext | None
|
Optional execution context |
None
|
返回:
| 类型 | 描述 |
|---|---|
ExecutionResult
|
ExecutionResult with complete execution details |
get_current_nonce
async
¶
Get the current nonce for an address.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
address
|
str | None
|
Address to query (defaults to signer address) |
None
|
返回:
| 类型 | 描述 |
|---|---|
int
|
Current nonce for the address |
get_gas_price
async
¶
Get current gas prices from the network.
返回:
| 类型 | 描述 |
|---|---|
dict[str, int]
|
Dict with max_fee_per_gas and max_priority_fee_per_gas |
set_event_callback
¶
Set the event callback.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
callback
|
EventCallback | None
|
Callback function for execution events |
必需 |
ExecutionResult¶
almanak.framework.execution.ExecutionResult
dataclass
¶
ExecutionResult(
success: bool,
phase: ExecutionPhase,
transaction_results: list[TransactionResult] = list(),
simulation_result: SimulationResult | None = None,
total_gas_used: int = 0,
total_gas_cost_wei: int = 0,
error: str | None = None,
error_phase: ExecutionPhase | None = None,
started_at: datetime = (lambda: datetime.now(UTC))(),
completed_at: datetime | None = None,
correlation_id: str = "",
gas_warnings: list[str] = list(),
position_id: int | str | None = None,
swap_amounts: SwapAmounts | None = None,
lp_close_data: LPCloseData | None = None,
prediction_fill: PredictionFill | None = None,
bridge_data: BridgeData | None = None,
bin_ids: list[int] | None = None,
protocol_fees: ProtocolFees | None = None,
primitive_money_legs: PrimitiveMoneyLegs | None = None,
extracted_data: dict[str, Any] = dict(),
extraction_warnings: list[str] = list(),
)
Complete result of an execution attempt.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
success |
bool
|
Whether all transactions succeeded |
phase |
ExecutionPhase
|
Phase where execution completed or failed |
transaction_results |
list[TransactionResult]
|
Results for each transaction |
simulation_result |
SimulationResult | None
|
Simulation result (if simulation was run) |
total_gas_used |
int
|
Sum of gas used across all transactions |
total_gas_cost_wei |
int
|
Sum of gas costs across all transactions |
error |
str | None
|
Error message if failed |
error_phase |
ExecutionPhase | None
|
Phase where error occurred |
started_at |
datetime
|
When execution started |
completed_at |
datetime | None
|
When execution completed |
correlation_id |
str
|
Unique identifier for this execution |
position_id |
int | str | None
|
LP position ID for LP_OPEN intents (NFT tokenId int, or pool address str for pool-based protocols like Curve), populated by ResultEnricher |
swap_amounts |
SwapAmounts | None
|
Swap execution data for SWAP intents |
lp_close_data |
LPCloseData | None
|
LP close data for LP_CLOSE intents |
prediction_fill |
PredictionFill | None
|
Polymarket CLOB fill data for PREDICTION_BUY / PREDICTION_SELL
intents. Populated by the runner's CLOB branch (VIB-3218). Strategy
authors should read |
bridge_data |
BridgeData | None
|
Typed source-chain deposit data for BRIDGE intents
(VIB-3226). Populated by ResultEnricher via the bridge adapter's
receipt parser. The destination-chain settlement is tracked
asynchronously by EnsoStateProvider; |
bin_ids |
list[int] | None
|
TraderJoe V2 bin IDs for LP positions |
protocol_fees |
ProtocolFees | None
|
Typed ProtocolFees enrichment, when a receipt parser
emits it (VIB-159). Top-level slot so strategy callbacks can read
|
primitive_money_legs |
PrimitiveMoneyLegs | None
|
Connector-declared |
extracted_data |
dict[str, Any]
|
Flexible dict for protocol-specific extracted data |
extraction_warnings |
list[str]
|
Non-fatal warnings from extraction process |
effective_price
property
¶
Convenience accessor for swap effective price.
slippage_bps
property
¶
Convenience accessor for swap slippage in basis points.
get_extracted
¶
Get extracted data with optional type checking.
Provides safe access to protocol-specific extracted data with optional type validation.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
key
|
str
|
Data key to retrieve from extracted_data |
必需 |
expected_type
|
type | None
|
Optional type to validate against |
None
|
default
|
Any
|
Default value if key not found or wrong type |
None
|
返回:
| 类型 | 描述 |
|---|---|
Any
|
Extracted value or default |
Example
tick_lower = result.get_extracted("tick_lower", int, 0) liquidity = result.get_extracted("liquidity")
to_outcome
¶
Convert to chain-agnostic ExecutionOutcome.
返回:
| 类型 | 描述 |
|---|---|
ExecutionOutcome
|
ExecutionOutcome with EVM-specific fields mapped to common shape. |
ExecutionContext¶
almanak.framework.execution.ExecutionContext
dataclass
¶
ExecutionContext(
deployment_id: str = "unknown",
intent_id: str = "",
chain: str = DEFAULT_CHAIN,
wallet_address: str = "",
correlation_id: str = "",
cycle_id: str = "",
session_id: str = "",
simulation_enabled: bool = False,
intent_description: str = "",
dry_run: bool = False,
protocol: str | None = None,
)
Context for the current execution.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
deployment_id |
str
|
Deployment identifier |
intent_id |
str
|
Intent identifier (for session tracking) |
chain |
str
|
Blockchain network |
wallet_address |
str
|
Address executing transactions |
correlation_id |
str
|
Unique identifier for this execution |
session_id |
str
|
Execution session identifier (for crash recovery) |
simulation_enabled |
bool
|
Whether to simulate before execution |
dry_run |
bool
|
If True, don't actually submit transactions |
Result Enrichment¶
After successful execution, ResultEnricher automatically extracts data from transaction receipts (position IDs, swap amounts, etc.) and attaches it to the result.
ResultEnricher¶
almanak.framework.execution.ResultEnricher
¶
ResultEnricher(
parser_registry: ReceiptParserRegistry | None = None,
*,
live_mode: bool = True,
pool_key_lookup: Any = None,
)
Enriches ExecutionResult with intent-specific extracted data.
This component implements the "Framework Orchestrates, Protocols Execute" pattern. It determines WHAT to extract based on intent type, and delegates HOW to extract to protocol-specific parsers.
Key Design Principles: 1. Fail-Closed (live): Parse errors raise CriticalAccountingError so the runner cannot proceed on a stale / ghost view of on-chain state. Paper / backtest callers opt into permissive mode via live_mode=False, which downgrades ExtractError to a structured warning + counter. "No event of this type" results (ExtractMissing) are benign in both modes and never raise. 2. Type-Safe: Core fields are strongly typed. 3. Extensible: New protocols can be added without framework changes. 4. Zero Cognitive Load: Data "just appears" on result. 5. Three-variant contract: migrated parsers return ExtractOk / ExtractMissing / ExtractError so "no event" and "parse error" are distinguishable. Legacy parsers keep working via backward-compat wrapping (see _legacy_warn / _invoke_extract).
Example
enricher = ResultEnricher()
In StrategyRunner after execution:¶
result = await orchestrator.execute(bundle) if result.success: result = enricher.enrich(result, intent, context)
Strategy callback receives enriched result:¶
strategy.on_intent_executed(intent, success=True, result=result)
Strategy can use result.position_id directly!¶
Initialize the ResultEnricher.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
parser_registry
|
ReceiptParserRegistry | None
|
Registry for protocol parsers. If not provided, uses the default global registry. |
None
|
live_mode
|
bool
|
When True (default), an ExtractError from a parser is converted into CriticalAccountingError and raised — accounting failures must not be silently treated as "no event". When False (paper / backtest), the error is logged and counted on result.extraction_warnings but does not halt execution. Default True is a deliberate fail-closed choice — paper trading entry points must opt into permissive mode. |
True
|
pool_key_lookup
|
Any
|
VIB-4477 (T08). Sync |
None
|
enrich
¶
enrich(
result: ExecutionResult,
intent: Any,
context: ExecutionContext,
*,
bundle_metadata: dict[str, Any] | None = None,
) -> ExecutionResult
Enrich execution result with intent-specific extracted data.
This method extracts relevant data from transaction receipts based on the intent type and attaches it to the ExecutionResult.
IMPORTANT (VIB-3159): In live mode this method FAILS CLOSED. Parsers that return ExtractError — or raise — cause CriticalAccountingError to propagate. Paper / backtest callers must construct the enricher with live_mode=False to downgrade those errors to warnings + a counter. Benign "no event of this type" results (ExtractMissing) never raise in either mode.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
result
|
ExecutionResult
|
Raw execution result from orchestrator |
必需 |
intent
|
Any
|
The intent that was executed |
必需 |
context
|
ExecutionContext
|
Execution context with chain info |
必需 |
bundle_metadata
|
dict[str, Any] | None
|
Optional ActionBundle.metadata dict from the
compiler. Used to thread compiler-side quote data (e.g.,
|
None
|
返回:
| 类型 | 描述 |
|---|---|
ExecutionResult
|
Enriched ExecutionResult (same instance, mutated) |
引发:
| 类型 | 描述 |
|---|---|
CriticalAccountingError
|
when live_mode is True and a parser returns ExtractError (or raises). Inherits from Exception so the strategy runner's recovery path in run_iteration can catch it and return ACCOUNTING_FAILED (VIB-3180). |
SwapAmounts¶
almanak.framework.execution.SwapAmounts
dataclass
¶
SwapAmounts(
amount_in: int,
amount_out: int,
amount_in_decimal: Decimal | None,
amount_out_decimal: Decimal | None,
effective_price: Decimal | None = None,
slippage_bps: int | None = None,
expected_out_decimal: Decimal | None = None,
token_in: str | None = None,
token_out: str | None = None,
amount_in_decimal_resolved: bool = True,
amount_out_decimal_resolved: bool = True,
slippage_source: SlippageSource = SlippageSource.NONE,
)
Extracted swap execution data.
Represents the token amounts exchanged in a swap transaction. All fields are immutable (frozen=True) for safety.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
amount_in |
int
|
Raw input amount (in token's smallest unit) |
amount_out |
int
|
Raw output amount (in token's smallest unit) |
amount_in_decimal |
Decimal | None
|
Human-readable input amount, or |
amount_out_decimal |
Decimal | None
|
Human-readable output amount, or |
effective_price |
Decimal | None
|
Actual execution price (out/in), or |
slippage_bps |
int | None
|
Actual slippage in basis points (None if unknown) |
expected_out_decimal |
Decimal | None
|
Pre-slippage-discount expected output in human
units, sourced from the compiler's ActionBundle metadata
(VIB-3203). Persisting this alongside |
token_in |
str | None
|
Input token address or symbol |
token_out |
str | None
|
Output token address or symbol |
amount_in_decimal_resolved |
bool
|
|
amount_out_decimal_resolved |
bool
|
Analogous flag for
|
Example
if result.swap_amounts: price = result.swap_amounts.effective_price slippage = result.swap_amounts.slippage_bps
LPCloseData¶
almanak.framework.execution.LPCloseData
dataclass
¶
LPCloseData(
amount0_collected: int | None = None,
amount1_collected: int | None = None,
fees0: int | None = None,
fees1: int | None = None,
liquidity_removed: int | None = None,
additional_amounts: dict[int, int] | None = None,
additional_fees: dict[int, int] | None = None,
current_tick: int | None = None,
pool_address: str = "",
source: str | None = None,
currency0: str | None = None,
currency1: str | None = None,
position_id: str | None = None,
fee_separation_method: str = "UNKNOWN",
fee_confidence: str = "UNKNOWN",
)
Extracted LP close execution data.
Represents the amounts collected when closing an LP position, including principal and fees.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
amount0_collected |
int | None
|
Total amount of token0 collected (principal + fees).
|
amount1_collected |
int | None
|
Total amount of token1 collected (principal + fees).
|
fees0 |
int | None
|
Fees earned in token0 (if separately tracked) |
fees1 |
int | None
|
Fees earned in token1 (if separately tracked) |
liquidity_removed |
int | None
|
Amount of liquidity removed (if available) |
additional_amounts |
dict[int, int] | None
|
Amounts for coins beyond token0/token1 (e.g., Curve 3/4-coin pools). Maps coin index to raw amount: {2: 50000000, 3: 91000000000000000000}. |
additional_fees |
dict[int, int] | None
|
Fees for coins beyond token0/token1. Maps coin index to fee amount: {2: 100000, 3: 0}. |
current_tick |
int | None
|
Pool's current tick at the moment of close (VIB-3940).
Mirrors |
pool_address |
str
|
V3 pool address that emitted the Burn event (VIB-3940).
Required input for the framework's slot0 fallback. Empty when the
parser couldn't identify the pool. Mirrors |
source |
str | None
|
Provenance tag for the on-chain event the close amounts were
decoded from. |
Example
if result.lp_close_data: total_0 = result.lp_close_data.amount0_collected fees_0 = result.lp_close_data.fees0 # For 4-coin pools (e.g., Curve NG): all_amounts = result.lp_close_data.all_amounts # [amt0, amt1, amt2, amt3]
all_amounts
property
¶
Return all coin amounts as a list, including additional coins.
Per Empty ≠ Zero (VIB-5117 / VIB-5121): None slots stand for
"unmeasured by this parser" — e.g. a native principal leg the receipt
could not observe (filled later from the V4 pre-burn position-state read
or the Fluid balance bracket). A numeric 0 is a measured zero.
Mirrors :attr:all_fees.
all_fees
property
¶
Return all fee amounts as a list, including additional coins.
Per Empty ≠ Zero: None slots stand for "unmeasured by this parser".
Signers¶
LocalKeySigner¶
almanak.framework.execution.LocalKeySigner
¶
Bases: Signer
Signs transactions using a local private key.
This signer uses the eth-account library to sign transactions locally. The private key is stored in memory and derived to an Ethereum account at initialization time.
SECURITY CONTRACT
- Private keys are NEVER logged, printed, or included in error messages
- Private keys are NEVER exposed through any method or property
- Transaction fields are validated before signing
- The wallet address is derived from the key at initialization
Supported transaction types
- EIP-1559 (Type 2): Modern fee market transactions
- Legacy (Type 0): Pre-EIP-1559 transactions
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
address |
str
|
The checksummed Ethereum address derived from the private key |
Example
Initialize with private key¶
signer = LocalKeySigner(private_key="0x...")
Create unsigned transaction¶
tx = UnsignedTransaction( to="0x1234...", value=0, data="0xa9059cbb...", chain_id=42161, gas_limit=100000, nonce=5, max_fee_per_gas=100_000_000, max_priority_fee_per_gas=1_000_000, )
Sign transaction¶
signed = await signer.sign(tx, chain="arbitrum") print(f"Signed tx hash: {signed.tx_hash}")
Initialize the signer with a private key.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
private_key
|
str
|
Hex-encoded private key (with or without 0x prefix) |
必需 |
引发:
| 类型 | 描述 |
|---|---|
SigningError
|
If the private key is invalid |
Example
signer = LocalKeySigner(private_key="0xabc123...")
address
property
¶
Return the wallet address associated with this signer.
The address is derived from the private key at initialization and is cached for efficiency.
返回:
| 类型 | 描述 |
|---|---|
str
|
Checksummed Ethereum address (0x-prefixed, 42 characters) |
Example
signer = LocalKeySigner(private_key="0x...") print(signer.address) # 0x71C7656EC7ab88b098defB751B7401B5f6d8976F
sign
async
¶
Sign a transaction with the local private key.
This method validates the transaction fields, builds the appropriate transaction dictionary based on transaction type (EIP-1559 or legacy), and signs it using the eth-account library.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
tx
|
UnsignedTransaction
|
Unsigned transaction to sign |
必需 |
chain
|
str
|
Chain name (e.g., "arbitrum", "ethereum") for logging/validation |
必需 |
返回:
| 类型 | 描述 |
|---|---|
SignedTransaction
|
SignedTransaction containing the raw signed bytes and transaction hash |
引发:
| 类型 | 描述 |
|---|---|
SigningError
|
If signing fails due to invalid fields or key issues |
ValueError
|
If transaction fields are malformed |
Example
tx = UnsignedTransaction( to="0x1234...", value=1_000_000_000_000_000_000, # 1 ETH data="0x", chain_id=1, gas_limit=21000, nonce=0, max_fee_per_gas=30_000_000_000, max_priority_fee_per_gas=1_000_000_000, ) signed = await signer.sign(tx, chain="ethereum")
Simulators¶
DirectSimulator¶
almanak.framework.execution.DirectSimulator
¶
Bases: Simulator
Pass-through simulator that skips actual simulation.
This simulator returns a successful SimulationResult without performing any actual simulation. It is designed for environments where simulation is not needed, such as local fork testing or trusted execution contexts.
The key differentiator from other simulators:
- Returns simulated=False to indicate no simulation was performed
- Always returns success=True (assumes transactions are valid)
- Logs that simulation was skipped for observability
This is NOT a stub or shortcut - it is a legitimate implementation for production use cases where pre-execution simulation adds no value or where the latency cost is unacceptable.
Future Alternatives: - TenderlySimulator: Full simulation via Tenderly API - LocalSimulator: Simulation via local node eth_call - FlashbotsSimulator: Simulation via Flashbots bundle simulation
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
name |
str
|
Identifier for this simulator (for logging and metrics) |
Example
simulator = DirectSimulator()
Simulate a single transaction¶
result = await simulator.simulate([tx], chain="arbitrum")
if result.success: # Proceed to signing and submission ... else: # Handle simulation failure (won't happen with DirectSimulator) ...
Initialize the DirectSimulator.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
name
|
str
|
Identifier for this simulator instance (default: "direct") |
'direct'
|
simulate
async
¶
simulate(
txs: list[UnsignedTransaction],
chain: str,
state_overrides: dict[str, Any] | None = None,
) -> SimulationResult
Return a pass-through simulation result without actual simulation.
This method logs that simulation was skipped and returns a successful
SimulationResult with simulated=False to indicate no actual simulation
was performed.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
txs
|
list[UnsignedTransaction]
|
List of unsigned transactions to "simulate" |
必需 |
chain
|
str
|
Chain name (logged but not used for simulation) |
必需 |
state_overrides
|
dict[str, Any] | None
|
Ignored - DirectSimulator doesn't perform simulation |
None
|
返回:
| 类型 | 描述 |
|---|---|
SimulationResult
|
SimulationResult with: |
SimulationResult
|
|
SimulationResult
|
|
SimulationResult
|
|
Note
This method never raises exceptions for valid input. Infrastructure failures are not possible since no external calls are made.
TenderlySimulator¶
almanak.framework.execution.TenderlySimulator
¶
TenderlySimulator(
account_slug: str,
project_slug: str,
access_key: str,
timeout_seconds: float = 10.0,
name: str = "tenderly",
ssl_context: SSLContext | None = None,
)
Bases: Simulator
Transaction simulation via Tenderly REST API.
This simulator uses Tenderly's simulate-bundle endpoint to simulate transactions before submission. It provides accurate gas estimates, pre-flight validation, and supports SAFE wallet simulations via state overrides.
Key Advantages over Alchemy
- No transaction bundle limit (Alchemy limited to 3)
- State override support for SAFE wallet ETH balance
- Support for more chains (9+ vs 4 for Alchemy)
- Detailed simulation dashboard URLs
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
account_slug |
Tenderly account identifier |
|
project_slug |
Tenderly project identifier |
|
timeout_seconds |
Request timeout |
Example
simulator = TenderlySimulator( account_slug="my-account", project_slug="my-project", access_key="xxx", )
Basic simulation¶
result = await simulator.simulate([tx], chain="arbitrum")
SAFE wallet simulation with ETH balance override¶
result = await simulator.simulate( [tx], chain="arbitrum", state_overrides={"0xSafeAddress": {"balance": hex(10 * 10**18)}}, )
Initialize the TenderlySimulator.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
account_slug
|
str
|
Tenderly account slug |
必需 |
project_slug
|
str
|
Tenderly project slug |
必需 |
access_key
|
str
|
Tenderly API access key |
必需 |
timeout_seconds
|
float
|
Request timeout (default 10s) |
10.0
|
name
|
str
|
Simulator name for logging (default "tenderly") |
'tenderly'
|
ssl_context
|
SSLContext | None
|
Optional SSL context for outbound HTTPS. When provided
(e.g. the gateway threads in its certifi-backed context), the
aiohttp session uses a |
None
|
引发:
| 类型 | 描述 |
|---|---|
ValueError
|
If any required parameter is missing |
supports_chain
¶
Check if this simulator supports a given chain.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
chain
|
str
|
Chain name (lowercase) |
必需 |
返回:
| 类型 | 描述 |
|---|---|
bool
|
True if Tenderly supports this chain |
simulate
async
¶
simulate(
txs: list[UnsignedTransaction],
chain: str,
state_overrides: dict[str, Any] | None = None,
) -> SimulationResult
Simulate transactions via Tenderly API.
This method simulates the execution of one or more transactions using Tenderly's bundle simulation endpoint. It returns gas estimates and validates that transactions will succeed.
参数:
| 名称 | 类型 | 描述 | 默认 |
|---|---|---|---|
txs
|
list[UnsignedTransaction]
|
List of unsigned transactions to simulate |
必需 |
chain
|
str
|
Chain name (e.g., "arbitrum") |
必需 |
state_overrides
|
dict[str, Any] | None
|
Optional state overrides for SAFE wallets Format: {"0xAddress": {"balance": "0xHexWei"}} |
None
|
返回:
| 类型 | 描述 |
|---|---|
SimulationResult
|
SimulationResult with gas estimates and success status |
引发:
| 类型 | 描述 |
|---|---|
SimulationError
|
For infrastructure failures (not tx failures) |
Receipt Parsing¶
ReceiptParserRegistry¶
almanak.framework.execution.ReceiptParserRegistry
¶
Façade over STRATEGY_RECEIPT_PARSER_REGISTRY.
Adds the caching + custom-registration semantics consumers rely on
on top of the strategy-side connector registry's
classes_by_key() resolution:
- Per-protocol cache —
get(protocol)(no kwargs) caches the instantiated parser.register(custom)evicts that entry. - Custom registration —
register(protocol, cls)injects a user-supplied parser class that takes precedence over the connector-provided one. Used by tests and by strategy-side consumers that need to substitute a parser at runtime. - Alias normalisation —
protocolis run through :func:almanak.connectors._strategy_base.protocol_aliases.normalize_protocolbefore lookup, so chain-scoped renames (e.g.("mantle", "uniswap_v3")→"agni_finance") resolve identically to the pre-W2 behaviour.
The class is intentionally instantiable so each consumer can hold
its own cache scope (ResultEnricher historically constructs one
per execution context). The module-level convenience functions
(get_parser / list_parsers / …) share a single default
instance.
get
¶
Resolve protocol to a parser instance.
See class docstring for cache / alias semantics. Raises
:class:ValueError if the protocol isn't known.
register
¶
Register a custom parser class. Evicts the cached instance.
unregister
¶
Remove a custom parser registration. Returns True if removed.
list_protocols
¶
Return every protocol key resolvable by this registry.
is_registered
¶
Whether protocol resolves to a parser (built-in or custom).
Exceptions¶
almanak.framework.execution.ExecutionError
¶
Bases: Exception
Base exception for execution layer errors.
All execution-related exceptions inherit from this class to allow broad exception handling when needed.
almanak.framework.execution.SimulationError
¶
Bases: ExecutionError
Raised when transaction simulation fails.
This is distinct from a transaction that simulates successfully but would revert - that returns SimulationResult with success=False.
This exception is for infrastructure failures: - Simulation service unavailable - Network timeout - Invalid simulation parameters
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
reason |
Human-readable explanation of the failure |
|
recoverable |
Whether the error is transient and can be retried |
almanak.framework.execution.SigningError
¶
Bases: ExecutionError
Raised when transaction signing fails.
This exception should be raised when: - Private key is invalid or missing - Transaction fields are malformed - Signing algorithm encounters an error
Note: Never include sensitive key material in error messages.
属性:
| 名称 | 类型 | 描述 |
|---|---|---|
reason |
Human-readable explanation of the failure |
|
tx_hash |
Optional hash of the transaction that failed (if available) |