Skip to content

Services

Operational services for strategy monitoring, stuck detection, and emergency management.

StuckDetector

Detects when a strategy is stuck and unable to make progress.

almanak.framework.services.StuckDetector

StuckDetector(
    stuck_threshold_seconds: int = DEFAULT_STUCK_THRESHOLD_SECONDS,
    emit_events: bool = True,
)

Detects and classifies stuck strategies.

The detector analyzes various aspects of strategy state to determine: 1. Whether the strategy is stuck (in same state too long) 2. Why it's stuck (classification using StuckReason)

Detection is performed by checking: - Pending transactions and gas prices - Token balances and gas availability - Token allowances - Market conditions (slippage, liquidity) - Protocol status (oracle freshness, paused state) - RPC health - Risk guard and circuit breaker state

Initialize the stuck detector.

Parameters:

Name Type Description Default
stuck_threshold_seconds int

Time in seconds before a strategy is considered stuck (default 10 minutes).

DEFAULT_STUCK_THRESHOLD_SECONDS
emit_events bool

Whether to emit timeline events when stuck is detected.

True

detect_stuck

detect_stuck(
    snapshot: StrategySnapshot,
) -> StuckDetectionResult

Detect if a strategy is stuck and classify the reason.

This is the main entry point for stuck detection. It checks if the strategy has been in the same state longer than the threshold, and if so, attempts to classify the reason.

Parameters:

Name Type Description Default
snapshot StrategySnapshot

Current strategy state snapshot

required

Returns:

Type Description
StuckDetectionResult

StuckDetectionResult with stuck status and reason if applicable

EmergencyManager

Handles emergency scenarios like position unwinding.

almanak.framework.services.EmergencyManager

EmergencyManager(
    alert_manager: AlertManager | None = None,
    pause_callback: PauseStrategyCallback | None = None,
    position_callback: GetPositionCallback | None = None,
    dashboard_base_url: str | None = None,
)

Manages emergency stop procedures for strategies.

The EmergencyManager handles the full emergency stop workflow: 1. Immediately pause the strategy 2. Gather complete position summary 3. Generate EMERGENCY_STOP OperatorCard with suggested actions 4. Send CRITICAL alerts to configured channels 5. Return complete EmergencyResult

Attributes:

Name Type Description
alert_manager

AlertManager for sending CRITICAL alerts

pause_callback

Callback to pause a strategy

position_callback

Callback to get position summary

Initialize the EmergencyManager.

Parameters:

Name Type Description Default
alert_manager AlertManager | None

AlertManager instance for sending alerts

None
pause_callback PauseStrategyCallback | None

Callback function to pause a strategy

None
position_callback GetPositionCallback | None

Callback function to get position summary

None
dashboard_base_url str | None

Base URL for dashboard links

None

emergency_stop

emergency_stop(
    strategy_id: str,
    reason: str,
    chain: str = "",
    trigger_context: dict[str, Any] | None = None,
) -> EmergencyResult

Execute an emergency stop for a strategy.

This method: 1. Immediately pauses the strategy 2. Gets the full position summary (all tokens, LP positions, borrows) 3. Generates an OperatorCard with EMERGENCY_STOP event 4. Suggests actions: EMERGENCY_UNWIND, MANUAL_REVIEW 5. Sends CRITICAL alerts 6. Returns EmergencyResult with position summary

Parameters:

Name Type Description Default
strategy_id str

The ID of the strategy to stop

required
reason str

Human-readable reason for the emergency stop

required
chain str

The blockchain network the strategy operates on

''
trigger_context dict[str, Any] | None

Optional additional context about what triggered the emergency

None

Returns:

Type Description
EmergencyResult

EmergencyResult with full details of the emergency stop operation

emergency_stop_async async

emergency_stop_async(
    strategy_id: str,
    reason: str,
    chain: str = "",
    trigger_context: dict[str, Any] | None = None,
) -> EmergencyResult

Async version of emergency_stop.

Parameters:

Name Type Description Default
strategy_id str

The ID of the strategy to stop

required
reason str

Human-readable reason for the emergency stop

required
chain str

The blockchain network the strategy operates on

''
trigger_context dict[str, Any] | None

Optional additional context about what triggered the emergency

None

Returns:

Type Description
EmergencyResult

EmergencyResult with full details of the emergency stop operation

OperatorCardGenerator

Generates operator cards for strategy issues.

almanak.framework.services.OperatorCardGenerator

OperatorCardGenerator(
    dashboard_base_url: str | None = None,
)

Generates OperatorCards from strategy state and events.

The generator automatically: - Detects the StuckReason from error type and context - Calculates severity based on position at risk and time stuck - Looks up suggested actions from REMEDIATION_MAP - Generates human-readable risk descriptions - Sets up auto-remediation when applicable

Initialize the generator.

Parameters:

Name Type Description Default
dashboard_base_url str | None

Base URL for dashboard links in risk descriptions.

None

generate_card

generate_card(
    strategy_state: StrategyState,
    error_context: ErrorContext | None = None,
    event_type: EventType = EventType.STUCK,
) -> OperatorCard

Generate an OperatorCard from strategy state and error context.

Parameters:

Name Type Description Default
strategy_state StrategyState

Current state of the strategy.

required
error_context ErrorContext | None

Optional context about the error that triggered this.

None
event_type EventType

Type of event (defaults to STUCK).

STUCK

Returns:

Type Description
OperatorCard

A fully populated OperatorCard.

PredictionPositionMonitor

Monitors prediction market positions for resolution events.

almanak.framework.services.PredictionPositionMonitor

PredictionPositionMonitor(
    strategy_id: str = "",
    check_interval: int = DEFAULT_CHECK_INTERVAL,
    emit_events: bool = True,
    event_callback: EventCallback | None = None,
    default_exit_before_resolution_hours: int | None = None,
    allow_partial_exits: bool = True,
)

Monitors prediction market positions for lifecycle events.

The monitor tracks positions and evaluates exit conditions: - Market resolution detection - Price threshold alerts (stop-loss, take-profit) - Time-based alerts (approaching resolution) - Trailing stop calculations - Liquidity and spread warnings - Partial exits when liquidity is insufficient

Example

monitor = PredictionPositionMonitor( strategy_id="my-strategy", check_interval=60, default_exit_before_resolution_hours=24, # Strategy-level default )

Add a position to monitor

monitor.add_position( position=MonitoredPosition( market_id="will-btc-reach-100k", condition_id="0x...", token_id="12345...", outcome="YES", size=Decimal("100"), entry_price=Decimal("0.65"), entry_time=datetime.now(UTC), exit_conditions=PredictionExitConditions( stop_loss_price=Decimal("0.50"), take_profit_price=Decimal("0.85"), exit_before_resolution_hours=24, ), ), )

Check positions (call periodically)

results = monitor.check_positions(snapshots) for result in results: if result.triggered: print(f"Event: {result.event}, Action: {result.suggested_action}")

Initialize the position monitor.

Parameters:

Name Type Description Default
strategy_id str

Strategy identifier for event emission.

''
check_interval int

Seconds between position checks.

DEFAULT_CHECK_INTERVAL
emit_events bool

Whether to emit timeline events.

True
event_callback EventCallback | None

Optional callback for events.

None
default_exit_before_resolution_hours int | None

Strategy-level default for exit_before_resolution_hours. Applied to positions that don't have their own exit conditions or don't specify this value.

None
allow_partial_exits bool

If True, generate partial sell intents when orderbook liquidity is insufficient for the full position. Defaults to True.

True

positions property

positions: dict[str, MonitoredPosition]

Get all monitored positions.

add_position

add_position(position: MonitoredPosition) -> None

Add a position to monitor.

Parameters:

Name Type Description Default
position MonitoredPosition

The position to monitor.

required

remove_position

remove_position(market_id: str) -> MonitoredPosition | None

Remove a position from monitoring.

Parameters:

Name Type Description Default
market_id str

Market ID to remove.

required

Returns:

Type Description
MonitoredPosition | None

The removed position, or None if not found.

get_position

get_position(market_id: str) -> MonitoredPosition | None

Get a monitored position by market ID.

Parameters:

Name Type Description Default
market_id str

Market ID to look up.

required

Returns:

Type Description
MonitoredPosition | None

The monitored position, or None if not found.

update_position_price

update_position_price(
    market_id: str, current_price: Decimal
) -> None

Update the current price for a position.

This also updates the highest_price for trailing stop tracking.

Parameters:

Name Type Description Default
market_id str

Market ID to update.

required
current_price Decimal

New current price.

required

check_position

check_position(
    position: MonitoredPosition, snapshot: PositionSnapshot
) -> MonitoringResult

Check a single position against its exit conditions.

Evaluates all exit conditions and returns the most urgent triggered event, if any.

Parameters:

Name Type Description Default
position MonitoredPosition

The position to check.

required
snapshot PositionSnapshot

Current market data snapshot.

required

Returns:

Type Description
MonitoringResult

MonitoringResult with any triggered events.

check_positions

check_positions(
    snapshots: dict[str, PositionSnapshot],
) -> list[MonitoringResult]

Check all monitored positions against provided snapshots.

Parameters:

Name Type Description Default
snapshots dict[str, PositionSnapshot]

Market data snapshots keyed by market_id.

required

Returns:

Type Description
list[MonitoringResult]

List of monitoring results for all positions.

clear

clear() -> None

Clear all monitored positions.

calculate_safe_exit_size

calculate_safe_exit_size(
    position: MonitoredPosition, snapshot: PositionSnapshot
) -> tuple[Decimal | None, bool]

Calculate the safe exit size based on available liquidity.

When orderbook depth is insufficient for the full position, this method calculates a partial exit size that respects the available liquidity.

Parameters:

Name Type Description Default
position MonitoredPosition

The position to potentially exit.

required
snapshot PositionSnapshot

Current market data including orderbook depth.

required

Returns:

Type Description
Decimal | None

A tuple of (safe_size, is_insufficient):

bool
  • (None, False): No orderbook data available, proceed with "all"
tuple[Decimal | None, bool]
  • (None, True): Liquidity is too low for even a partial exit
tuple[Decimal | None, bool]
  • (Decimal, False): Full position can be exited (safe_size >= position.size)
tuple[Decimal | None, bool]
  • (Decimal, True): Partial exit size (liquidity limited)
tuple[Decimal | None, bool]

The first element is the recommended exit size, or None if "all".

tuple[Decimal | None, bool]

The second element indicates whether liquidity is insufficient.

generate_sell_intent

generate_sell_intent(
    result: MonitoringResult,
    snapshot: PositionSnapshot | None = None,
) -> PredictionSellIntent | None

Generate a PredictionSellIntent for a triggered exit condition.

When an exit condition is triggered (stop-loss, take-profit, trailing stop, or pre-resolution exit), this method generates the appropriate sell intent to exit the position.

If partial exits are enabled and the snapshot indicates insufficient liquidity, this method will generate a partial sell intent instead of a full exit.

Parameters:

Name Type Description Default
result MonitoringResult

The monitoring result with a triggered exit condition.

required
snapshot PositionSnapshot | None

Optional market snapshot for calculating partial exit size. When provided and allow_partial_exits is True, the method will check orderbook depth and generate partial exits if needed.

None

Returns:

Type Description
PredictionSellIntent | None

A PredictionSellIntent to exit the position, or None if no sell is needed

PredictionSellIntent | None

(e.g., for warning events like LOW_LIQUIDITY or SPREAD_TOO_WIDE, or if

PredictionSellIntent | None

available liquidity is too low for even a partial exit).

Example

results = monitor.check_positions(snapshots) for result in results: if result.triggered and result.suggested_action == "SELL": snapshot = snapshots.get(result.position.market_id) sell_intent = monitor.generate_sell_intent(result, snapshot) if sell_intent: # Execute the sell intent compiler.compile(sell_intent)

AutoRedemptionService

Automatically redeems resolved prediction market positions.

almanak.framework.services.AutoRedemptionService

AutoRedemptionService(
    sdk: Any,
    private_key: str,
    strategy_id: str = "",
    enabled: bool = True,
    max_retries: int = DEFAULT_MAX_RETRIES,
    retry_delay_seconds: int = DEFAULT_RETRY_DELAY_SECONDS,
    emit_events: bool = True,
    redemption_callback: RedemptionCallback | None = None,
    receipt_timeout_seconds: int = DEFAULT_RECEIPT_TIMEOUT_SECONDS,
    receipt_poll_interval_seconds: float = DEFAULT_RECEIPT_POLL_INTERVAL_SECONDS,
)

Automatically redeems winning prediction market positions.

This service listens for MARKET_RESOLVED events and automatically builds and submits redemption transactions for winning positions.

Key features: - Configurable per-strategy enable/disable - Retry logic with exponential backoff - Timeline event emission for tracking - Callback support for custom handling - Transaction receipt verification with configurable timeout - Proper extraction of redemption amounts from PayoutRedemption events

Thread Safety

This class is NOT thread-safe. Use separate instances per thread.

Example

service = AutoRedemptionService( ... sdk=sdk, ... private_key="0x...", ... strategy_id="my-strategy", ... enabled=True, ... receipt_timeout_seconds=120, ... )

Handle market resolution

event = MarketResolvedEvent(position=pos, winning_outcome="YES", is_winner=True) result = service.on_market_resolved(event) if result.status == RedemptionStatus.SUCCESS: ... print(f"Redeemed {result.amount_received} USDC")

Initialize the auto-redemption service.

Parameters:

Name Type Description Default
sdk Any

PolymarketSDK instance for redemption operations.

required
private_key str

Private key for signing redemption transactions.

required
strategy_id str

Strategy identifier for event emission.

''
enabled bool

Whether auto-redemption is enabled.

True
max_retries int

Maximum number of retry attempts.

DEFAULT_MAX_RETRIES
retry_delay_seconds int

Initial delay between retries.

DEFAULT_RETRY_DELAY_SECONDS
emit_events bool

Whether to emit timeline events.

True
redemption_callback RedemptionCallback | None

Optional callback for redemption results.

None
receipt_timeout_seconds int

Timeout for waiting for transaction receipt (default 120s).

DEFAULT_RECEIPT_TIMEOUT_SECONDS
receipt_poll_interval_seconds float

Interval between receipt polling attempts (default 2s).

DEFAULT_RECEIPT_POLL_INTERVAL_SECONDS

redemptions property

redemptions: dict[str, RedemptionAttempt]

Get all redemption attempts.

get_redemption

get_redemption(market_id: str) -> RedemptionAttempt | None

Get redemption attempt for a market.

Parameters:

Name Type Description Default
market_id str

Market ID to look up.

required

Returns:

Type Description
RedemptionAttempt | None

RedemptionAttempt or None if not found.

on_event

on_event(
    position: MonitoredPosition,
    event: PredictionEvent,
    details: dict[str, Any],
) -> None

Handle prediction events from the position monitor.

This method can be registered as a callback with the PredictionPositionMonitor to automatically handle events.

Parameters:

Name Type Description Default
position MonitoredPosition

The position that triggered the event.

required
event PredictionEvent

The type of event.

required
details dict[str, Any]

Additional event details.

required

on_market_resolved

on_market_resolved(
    event: MarketResolvedEvent,
) -> RedemptionAttempt

Handle market resolution and redeem winning positions.

This method is called when a market is resolved. It checks if the position is a winner and initiates redemption if enabled.

Parameters:

Name Type Description Default
event MarketResolvedEvent

The market resolved event with position details.

required

Returns:

Type Description
RedemptionAttempt

RedemptionAttempt tracking the redemption status.

enable

enable() -> None

Enable auto-redemption.

disable

disable() -> None

Disable auto-redemption.

clear_history

clear_history() -> None

Clear redemption history.

Models

StuckDetectionResult

almanak.framework.services.StuckDetectionResult dataclass

StuckDetectionResult(
    is_stuck: bool,
    reason: StuckReason | None = None,
    time_in_state_seconds: float = 0,
    details: dict[str, Any] | None = None,
)

Result of stuck detection analysis.

to_dict

to_dict() -> dict[str, Any]

Convert to dictionary for serialization.

StrategySnapshot

almanak.framework.services.StrategySnapshot dataclass

StrategySnapshot(
    strategy_id: str,
    chain: str,
    current_state: str,
    state_entered_at: datetime,
    pending_transactions: list[PendingTransaction],
    current_gas_price: int | None = None,
    balance_info: BalanceInfo | None = None,
    allowance_issues: list[AllowanceInfo] | None = None,
    current_slippage: Decimal | None = None,
    max_allowed_slippage: Decimal | None = None,
    pool_liquidity_usd: Decimal | None = None,
    oracle_last_updated: datetime | None = None,
    protocol_paused: bool = False,
    rpc_healthy: bool = True,
    last_rpc_error: str | None = None,
    risk_guard_blocked: bool = False,
    risk_guard_reason: str | None = None,
    circuit_breaker_triggered: bool = False,
)

Snapshot of strategy state for stuck detection.

This represents all the information needed to detect and classify a stuck strategy.

EmergencyResult

almanak.framework.services.EmergencyResult dataclass

EmergencyResult(
    success: bool,
    strategy_id: str,
    timestamp: datetime,
    position_summary: FullPositionSummary,
    operator_card: OperatorCard,
    alert_result: AlertSendResult | None = None,
    error: str | None = None,
    pause_successful: bool = False,
    alerts_sent: bool = False,
)

Result of an emergency stop operation.

to_dict

to_dict() -> dict[str, Any]

Convert the emergency result to a dictionary for serialization.

FullPositionSummary

almanak.framework.services.FullPositionSummary dataclass

FullPositionSummary(
    strategy_id: str,
    chain: str,
    timestamp: datetime,
    token_positions: list[TokenPosition] = list(),
    lp_positions: list[LPPositionInfo] = list(),
    borrow_positions: list[BorrowPosition] = list(),
    total_token_value_usd: Decimal = Decimal("0"),
    total_lp_value_usd: Decimal = Decimal("0"),
    total_collateral_value_usd: Decimal = Decimal("0"),
    total_borrowed_value_usd: Decimal = Decimal("0"),
)

Complete position summary including all tokens, LP positions, and borrows.

total_value_usd property

total_value_usd: Decimal

Calculate total portfolio value in USD.

net_exposure_usd property

net_exposure_usd: Decimal

Calculate net exposure (total value - borrowed).

has_lp_positions property

has_lp_positions: bool

Check if strategy has any LP positions.

has_borrow_positions property

has_borrow_positions: bool

Check if strategy has any borrow positions.

min_health_factor property

min_health_factor: Decimal | None

Get the minimum health factor across all borrow positions.

to_position_summary

to_position_summary() -> PositionSummary

Convert to the standard PositionSummary format for OperatorCard.

to_dict

to_dict() -> dict[str, Any]

Convert the full position summary to a dictionary for serialization.