Services¶
Operational services for strategy monitoring, stuck detection, and emergency management.
StuckDetector¶
Detects when a strategy is stuck and unable to make progress.
almanak.framework.services.StuckDetector
¶
StuckDetector(
stuck_threshold_seconds: int = DEFAULT_STUCK_THRESHOLD_SECONDS,
emit_events: bool = True,
)
Detects and classifies stuck strategies.
The detector analyzes various aspects of strategy state to determine: 1. Whether the strategy is stuck (in same state too long) 2. Why it's stuck (classification using StuckReason)
Detection is performed by checking: - Pending transactions and gas prices - Token balances and gas availability - Token allowances - Market conditions (slippage, liquidity) - Protocol status (oracle freshness, paused state) - RPC health - Risk guard and circuit breaker state
Initialize the stuck detector.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stuck_threshold_seconds
|
int
|
Time in seconds before a strategy is considered stuck (default 10 minutes). |
DEFAULT_STUCK_THRESHOLD_SECONDS
|
emit_events
|
bool
|
Whether to emit timeline events when stuck is detected. |
True
|
detect_stuck
¶
Detect if a strategy is stuck and classify the reason.
This is the main entry point for stuck detection. It checks if the strategy has been in the same state longer than the threshold, and if so, attempts to classify the reason.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshot
|
StrategySnapshot
|
Current strategy state snapshot |
required |
Returns:
| Type | Description |
|---|---|
StuckDetectionResult
|
StuckDetectionResult with stuck status and reason if applicable |
EmergencyManager¶
Handles emergency scenarios like position unwinding.
almanak.framework.services.EmergencyManager
¶
EmergencyManager(
alert_manager: AlertManager | None = None,
pause_callback: PauseStrategyCallback | None = None,
position_callback: GetPositionCallback | None = None,
dashboard_base_url: str | None = None,
)
Manages emergency stop procedures for strategies.
The EmergencyManager handles the full emergency stop workflow: 1. Immediately pause the strategy 2. Gather complete position summary 3. Generate EMERGENCY_STOP OperatorCard with suggested actions 4. Send CRITICAL alerts to configured channels 5. Return complete EmergencyResult
Attributes:
| Name | Type | Description |
|---|---|---|
alert_manager |
AlertManager for sending CRITICAL alerts |
|
pause_callback |
Callback to pause a strategy |
|
position_callback |
Callback to get position summary |
Initialize the EmergencyManager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
alert_manager
|
AlertManager | None
|
AlertManager instance for sending alerts |
None
|
pause_callback
|
PauseStrategyCallback | None
|
Callback function to pause a strategy |
None
|
position_callback
|
GetPositionCallback | None
|
Callback function to get position summary |
None
|
dashboard_base_url
|
str | None
|
Base URL for dashboard links |
None
|
emergency_stop
¶
emergency_stop(
strategy_id: str,
reason: str,
chain: str = "",
trigger_context: dict[str, Any] | None = None,
) -> EmergencyResult
Execute an emergency stop for a strategy.
This method: 1. Immediately pauses the strategy 2. Gets the full position summary (all tokens, LP positions, borrows) 3. Generates an OperatorCard with EMERGENCY_STOP event 4. Suggests actions: EMERGENCY_UNWIND, MANUAL_REVIEW 5. Sends CRITICAL alerts 6. Returns EmergencyResult with position summary
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
The ID of the strategy to stop |
required |
reason
|
str
|
Human-readable reason for the emergency stop |
required |
chain
|
str
|
The blockchain network the strategy operates on |
''
|
trigger_context
|
dict[str, Any] | None
|
Optional additional context about what triggered the emergency |
None
|
Returns:
| Type | Description |
|---|---|
EmergencyResult
|
EmergencyResult with full details of the emergency stop operation |
emergency_stop_async
async
¶
emergency_stop_async(
strategy_id: str,
reason: str,
chain: str = "",
trigger_context: dict[str, Any] | None = None,
) -> EmergencyResult
Async version of emergency_stop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
The ID of the strategy to stop |
required |
reason
|
str
|
Human-readable reason for the emergency stop |
required |
chain
|
str
|
The blockchain network the strategy operates on |
''
|
trigger_context
|
dict[str, Any] | None
|
Optional additional context about what triggered the emergency |
None
|
Returns:
| Type | Description |
|---|---|
EmergencyResult
|
EmergencyResult with full details of the emergency stop operation |
OperatorCardGenerator¶
Generates operator cards for strategy issues.
almanak.framework.services.OperatorCardGenerator
¶
Generates OperatorCards from strategy state and events.
The generator automatically: - Detects the StuckReason from error type and context - Calculates severity based on position at risk and time stuck - Looks up suggested actions from REMEDIATION_MAP - Generates human-readable risk descriptions - Sets up auto-remediation when applicable
Initialize the generator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dashboard_base_url
|
str | None
|
Base URL for dashboard links in risk descriptions. |
None
|
generate_card
¶
generate_card(
strategy_state: StrategyState,
error_context: ErrorContext | None = None,
event_type: EventType = EventType.STUCK,
) -> OperatorCard
Generate an OperatorCard from strategy state and error context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_state
|
StrategyState
|
Current state of the strategy. |
required |
error_context
|
ErrorContext | None
|
Optional context about the error that triggered this. |
None
|
event_type
|
EventType
|
Type of event (defaults to STUCK). |
STUCK
|
Returns:
| Type | Description |
|---|---|
OperatorCard
|
A fully populated OperatorCard. |
PredictionPositionMonitor¶
Monitors prediction market positions for resolution events.
almanak.framework.services.PredictionPositionMonitor
¶
PredictionPositionMonitor(
strategy_id: str = "",
check_interval: int = DEFAULT_CHECK_INTERVAL,
emit_events: bool = True,
event_callback: EventCallback | None = None,
default_exit_before_resolution_hours: int | None = None,
allow_partial_exits: bool = True,
)
Monitors prediction market positions for lifecycle events.
The monitor tracks positions and evaluates exit conditions: - Market resolution detection - Price threshold alerts (stop-loss, take-profit) - Time-based alerts (approaching resolution) - Trailing stop calculations - Liquidity and spread warnings - Partial exits when liquidity is insufficient
Example
monitor = PredictionPositionMonitor( strategy_id="my-strategy", check_interval=60, default_exit_before_resolution_hours=24, # Strategy-level default )
Add a position to monitor¶
monitor.add_position( position=MonitoredPosition( market_id="will-btc-reach-100k", condition_id="0x...", token_id="12345...", outcome="YES", size=Decimal("100"), entry_price=Decimal("0.65"), entry_time=datetime.now(UTC), exit_conditions=PredictionExitConditions( stop_loss_price=Decimal("0.50"), take_profit_price=Decimal("0.85"), exit_before_resolution_hours=24, ), ), )
Check positions (call periodically)¶
results = monitor.check_positions(snapshots) for result in results: if result.triggered: print(f"Event: {result.event}, Action: {result.suggested_action}")
Initialize the position monitor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strategy_id
|
str
|
Strategy identifier for event emission. |
''
|
check_interval
|
int
|
Seconds between position checks. |
DEFAULT_CHECK_INTERVAL
|
emit_events
|
bool
|
Whether to emit timeline events. |
True
|
event_callback
|
EventCallback | None
|
Optional callback for events. |
None
|
default_exit_before_resolution_hours
|
int | None
|
Strategy-level default for exit_before_resolution_hours. Applied to positions that don't have their own exit conditions or don't specify this value. |
None
|
allow_partial_exits
|
bool
|
If True, generate partial sell intents when orderbook liquidity is insufficient for the full position. Defaults to True. |
True
|
add_position
¶
Add a position to monitor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
MonitoredPosition
|
The position to monitor. |
required |
remove_position
¶
Remove a position from monitoring.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str
|
Market ID to remove. |
required |
Returns:
| Type | Description |
|---|---|
MonitoredPosition | None
|
The removed position, or None if not found. |
get_position
¶
Get a monitored position by market ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str
|
Market ID to look up. |
required |
Returns:
| Type | Description |
|---|---|
MonitoredPosition | None
|
The monitored position, or None if not found. |
update_position_price
¶
Update the current price for a position.
This also updates the highest_price for trailing stop tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str
|
Market ID to update. |
required |
current_price
|
Decimal
|
New current price. |
required |
check_position
¶
Check a single position against its exit conditions.
Evaluates all exit conditions and returns the most urgent triggered event, if any.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
MonitoredPosition
|
The position to check. |
required |
snapshot
|
PositionSnapshot
|
Current market data snapshot. |
required |
Returns:
| Type | Description |
|---|---|
MonitoringResult
|
MonitoringResult with any triggered events. |
check_positions
¶
Check all monitored positions against provided snapshots.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
snapshots
|
dict[str, PositionSnapshot]
|
Market data snapshots keyed by market_id. |
required |
Returns:
| Type | Description |
|---|---|
list[MonitoringResult]
|
List of monitoring results for all positions. |
calculate_safe_exit_size
¶
calculate_safe_exit_size(
position: MonitoredPosition, snapshot: PositionSnapshot
) -> tuple[Decimal | None, bool]
Calculate the safe exit size based on available liquidity.
When orderbook depth is insufficient for the full position, this method calculates a partial exit size that respects the available liquidity.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
MonitoredPosition
|
The position to potentially exit. |
required |
snapshot
|
PositionSnapshot
|
Current market data including orderbook depth. |
required |
Returns:
| Type | Description |
|---|---|
Decimal | None
|
A tuple of (safe_size, is_insufficient): |
bool
|
|
tuple[Decimal | None, bool]
|
|
tuple[Decimal | None, bool]
|
|
tuple[Decimal | None, bool]
|
|
tuple[Decimal | None, bool]
|
The first element is the recommended exit size, or None if "all". |
tuple[Decimal | None, bool]
|
The second element indicates whether liquidity is insufficient. |
generate_sell_intent
¶
generate_sell_intent(
result: MonitoringResult,
snapshot: PositionSnapshot | None = None,
) -> PredictionSellIntent | None
Generate a PredictionSellIntent for a triggered exit condition.
When an exit condition is triggered (stop-loss, take-profit, trailing stop, or pre-resolution exit), this method generates the appropriate sell intent to exit the position.
If partial exits are enabled and the snapshot indicates insufficient liquidity, this method will generate a partial sell intent instead of a full exit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
result
|
MonitoringResult
|
The monitoring result with a triggered exit condition. |
required |
snapshot
|
PositionSnapshot | None
|
Optional market snapshot for calculating partial exit size. When provided and allow_partial_exits is True, the method will check orderbook depth and generate partial exits if needed. |
None
|
Returns:
| Type | Description |
|---|---|
PredictionSellIntent | None
|
A PredictionSellIntent to exit the position, or None if no sell is needed |
PredictionSellIntent | None
|
(e.g., for warning events like LOW_LIQUIDITY or SPREAD_TOO_WIDE, or if |
PredictionSellIntent | None
|
available liquidity is too low for even a partial exit). |
Example
results = monitor.check_positions(snapshots) for result in results: if result.triggered and result.suggested_action == "SELL": snapshot = snapshots.get(result.position.market_id) sell_intent = monitor.generate_sell_intent(result, snapshot) if sell_intent: # Execute the sell intent compiler.compile(sell_intent)
AutoRedemptionService¶
Automatically redeems resolved prediction market positions.
almanak.framework.services.AutoRedemptionService
¶
AutoRedemptionService(
sdk: Any,
private_key: str,
strategy_id: str = "",
enabled: bool = True,
max_retries: int = DEFAULT_MAX_RETRIES,
retry_delay_seconds: int = DEFAULT_RETRY_DELAY_SECONDS,
emit_events: bool = True,
redemption_callback: RedemptionCallback | None = None,
receipt_timeout_seconds: int = DEFAULT_RECEIPT_TIMEOUT_SECONDS,
receipt_poll_interval_seconds: float = DEFAULT_RECEIPT_POLL_INTERVAL_SECONDS,
)
Automatically redeems winning prediction market positions.
This service listens for MARKET_RESOLVED events and automatically builds and submits redemption transactions for winning positions.
Key features: - Configurable per-strategy enable/disable - Retry logic with exponential backoff - Timeline event emission for tracking - Callback support for custom handling - Transaction receipt verification with configurable timeout - Proper extraction of redemption amounts from PayoutRedemption events
Thread Safety
This class is NOT thread-safe. Use separate instances per thread.
Example
service = AutoRedemptionService( ... sdk=sdk, ... private_key="0x...", ... strategy_id="my-strategy", ... enabled=True, ... receipt_timeout_seconds=120, ... )
Handle market resolution¶
event = MarketResolvedEvent(position=pos, winning_outcome="YES", is_winner=True) result = service.on_market_resolved(event) if result.status == RedemptionStatus.SUCCESS: ... print(f"Redeemed {result.amount_received} USDC")
Initialize the auto-redemption service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sdk
|
Any
|
PolymarketSDK instance for redemption operations. |
required |
private_key
|
str
|
Private key for signing redemption transactions. |
required |
strategy_id
|
str
|
Strategy identifier for event emission. |
''
|
enabled
|
bool
|
Whether auto-redemption is enabled. |
True
|
max_retries
|
int
|
Maximum number of retry attempts. |
DEFAULT_MAX_RETRIES
|
retry_delay_seconds
|
int
|
Initial delay between retries. |
DEFAULT_RETRY_DELAY_SECONDS
|
emit_events
|
bool
|
Whether to emit timeline events. |
True
|
redemption_callback
|
RedemptionCallback | None
|
Optional callback for redemption results. |
None
|
receipt_timeout_seconds
|
int
|
Timeout for waiting for transaction receipt (default 120s). |
DEFAULT_RECEIPT_TIMEOUT_SECONDS
|
receipt_poll_interval_seconds
|
float
|
Interval between receipt polling attempts (default 2s). |
DEFAULT_RECEIPT_POLL_INTERVAL_SECONDS
|
get_redemption
¶
Get redemption attempt for a market.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
market_id
|
str
|
Market ID to look up. |
required |
Returns:
| Type | Description |
|---|---|
RedemptionAttempt | None
|
RedemptionAttempt or None if not found. |
on_event
¶
Handle prediction events from the position monitor.
This method can be registered as a callback with the PredictionPositionMonitor to automatically handle events.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
position
|
MonitoredPosition
|
The position that triggered the event. |
required |
event
|
PredictionEvent
|
The type of event. |
required |
details
|
dict[str, Any]
|
Additional event details. |
required |
on_market_resolved
¶
Handle market resolution and redeem winning positions.
This method is called when a market is resolved. It checks if the position is a winner and initiates redemption if enabled.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
MarketResolvedEvent
|
The market resolved event with position details. |
required |
Returns:
| Type | Description |
|---|---|
RedemptionAttempt
|
RedemptionAttempt tracking the redemption status. |
Models¶
StuckDetectionResult¶
almanak.framework.services.StuckDetectionResult
dataclass
¶
StuckDetectionResult(
is_stuck: bool,
reason: StuckReason | None = None,
time_in_state_seconds: float = 0,
details: dict[str, Any] | None = None,
)
Result of stuck detection analysis.
StrategySnapshot¶
almanak.framework.services.StrategySnapshot
dataclass
¶
StrategySnapshot(
strategy_id: str,
chain: str,
current_state: str,
state_entered_at: datetime,
pending_transactions: list[PendingTransaction],
current_gas_price: int | None = None,
balance_info: BalanceInfo | None = None,
allowance_issues: list[AllowanceInfo] | None = None,
current_slippage: Decimal | None = None,
max_allowed_slippage: Decimal | None = None,
pool_liquidity_usd: Decimal | None = None,
oracle_last_updated: datetime | None = None,
protocol_paused: bool = False,
rpc_healthy: bool = True,
last_rpc_error: str | None = None,
risk_guard_blocked: bool = False,
risk_guard_reason: str | None = None,
circuit_breaker_triggered: bool = False,
)
Snapshot of strategy state for stuck detection.
This represents all the information needed to detect and classify a stuck strategy.
EmergencyResult¶
almanak.framework.services.EmergencyResult
dataclass
¶
EmergencyResult(
success: bool,
strategy_id: str,
timestamp: datetime,
position_summary: FullPositionSummary,
operator_card: OperatorCard,
alert_result: AlertSendResult | None = None,
error: str | None = None,
pause_successful: bool = False,
alerts_sent: bool = False,
)
Result of an emergency stop operation.
to_dict
¶
Convert the emergency result to a dictionary for serialization.
FullPositionSummary¶
almanak.framework.services.FullPositionSummary
dataclass
¶
FullPositionSummary(
strategy_id: str,
chain: str,
timestamp: datetime,
token_positions: list[TokenPosition] = list(),
lp_positions: list[LPPositionInfo] = list(),
borrow_positions: list[BorrowPosition] = list(),
total_token_value_usd: Decimal = Decimal("0"),
total_lp_value_usd: Decimal = Decimal("0"),
total_collateral_value_usd: Decimal = Decimal("0"),
total_borrowed_value_usd: Decimal = Decimal("0"),
)
Complete position summary including all tokens, LP positions, and borrows.
net_exposure_usd
property
¶
Calculate net exposure (total value - borrowed).
has_borrow_positions
property
¶
Check if strategy has any borrow positions.
min_health_factor
property
¶
Get the minimum health factor across all borrow positions.
to_position_summary
¶
Convert to the standard PositionSummary format for OperatorCard.
to_dict
¶
Convert the full position summary to a dictionary for serialization.