Skip to content

Alerting

Alert management with Slack and Telegram channels, cooldown tracking, and escalation policies.

AlertManager

almanak.framework.alerting.AlertManager

AlertManager(
    config: AlertConfig,
    telegram_bot_token: str | None = None,
    slack_webhook_url: str | None = None,
    slack_enable_threading: bool = True,
)

Manages alert routing and delivery to configured channels.

The AlertManager is responsible for: - Evaluating alert rules against incoming events - Routing alerts to the appropriate channels (Telegram, Slack, etc.) - Applying cooldown to prevent spam - Respecting quiet hours - Logging all sent alerts

Attributes:

Name Type Description
config

The AlertConfig for this manager

telegram_channel TelegramChannel | None

Optional TelegramChannel instance

slack_channel SlackChannel | None

Optional SlackChannel instance

cooldown_tracker

Tracks cooldown state

Initialize the AlertManager.

Parameters:

Name Type Description Default
config AlertConfig

The AlertConfig with channel configurations and rules

required
telegram_bot_token str | None

Bot token for Telegram (required if using Telegram)

None
slack_webhook_url str | None

Webhook URL for Slack (overrides config.slack_webhook)

None
slack_enable_threading bool

Whether to enable threading for Slack alerts

True

telegram_channel property

telegram_channel: TelegramChannel | None

Get the Telegram channel if configured.

slack_channel property

slack_channel: SlackChannel | None

Get the Slack channel if configured.

send_alert async

send_alert(
    card: OperatorCard,
    metric_values: dict[AlertCondition, Decimal]
    | None = None,
) -> AlertSendResult

Send an alert for the given OperatorCard.

This method: 1. Finds matching alert rules based on the card's event type 2. Checks if alerts should be sent (quiet hours, cooldown) 3. Routes to configured channels 4. Records cooldown state 5. Logs all sent alerts

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard to alert on

required
metric_values dict[AlertCondition, Decimal] | None

Optional dict of metric values for threshold-based rules

None

Returns:

Type Description
AlertSendResult

AlertSendResult with status and any errors

send_alert_sync

send_alert_sync(
    card: OperatorCard,
    metric_values: dict[AlertCondition, Decimal]
    | None = None,
) -> AlertSendResult

Synchronous wrapper for send_alert.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard to alert on

required
metric_values dict[AlertCondition, Decimal] | None

Optional dict of metric values for threshold-based rules

None

Returns:

Type Description
AlertSendResult

AlertSendResult with status and any errors

send_direct_telegram_alert async

send_direct_telegram_alert(
    card: OperatorCard,
) -> AlertSendResult

Send an alert directly to Telegram, bypassing rule matching.

This is useful for critical system alerts that should always go through regardless of configured rules.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard to alert on

required

Returns:

Type Description
AlertSendResult

AlertSendResult with status

send_direct_telegram_alert_sync

send_direct_telegram_alert_sync(
    card: OperatorCard,
) -> AlertSendResult

Synchronous wrapper for send_direct_telegram_alert.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard to alert on

required

Returns:

Type Description
AlertSendResult

AlertSendResult with status

send_direct_slack_alert async

send_direct_slack_alert(
    card: OperatorCard, thread_ts: str | None = None
) -> AlertSendResult

Send an alert directly to Slack, bypassing rule matching.

This is useful for critical system alerts that should always go through regardless of configured rules. Supports threading for related alerts.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard to alert on

required
thread_ts str | None

Optional thread timestamp to reply to

None

Returns:

Type Description
AlertSendResult

AlertSendResult with status

send_direct_slack_alert_sync

send_direct_slack_alert_sync(
    card: OperatorCard, thread_ts: str | None = None
) -> AlertSendResult

Synchronous wrapper for send_direct_slack_alert.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard to alert on

required
thread_ts str | None

Optional thread timestamp to reply to

None

Returns:

Type Description
AlertSendResult

AlertSendResult with status

set_slack_thread

set_slack_thread(strategy_id: str, thread_ts: str) -> None

Set the Slack thread timestamp for a strategy.

This enables subsequent alerts for this strategy to be posted as thread replies.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
thread_ts str

The thread timestamp from Slack

required

clear_slack_thread

clear_slack_thread(strategy_id: str) -> None

Clear the Slack thread context for a strategy.

Call this when a strategy issue is resolved to start fresh threads for future alerts.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required

clear_cooldown

clear_cooldown(
    strategy_id: str,
    condition: AlertCondition | None = None,
) -> None

Clear cooldown state for a strategy.

Parameters:

Name Type Description Default
strategy_id str

The strategy to clear cooldowns for

required
condition AlertCondition | None

Optional specific condition to clear (clears all if None)

None

GatewayAlertManager

almanak.framework.alerting.GatewayAlertManager

GatewayAlertManager(
    client: GatewayClient,
    strategy_id: str = "",
    timeout: float = 30.0,
)

AlertManager that sends alerts through the gateway.

This implementation routes all alert requests to the gateway sidecar, which has access to the actual alerting channels (Slack, Telegram).

Example

from almanak.framework.gateway_client import GatewayClient from almanak.framework.alerting.gateway_alert_manager import GatewayAlertManager

with GatewayClient() as client: alert_manager = GatewayAlertManager(client, strategy_id="my-strategy") result = await alert_manager.send_alert( message="Strategy executed successfully", severity="info", ) print(f"Alert sent: {result.success}")

Initialize gateway-backed alert manager.

Parameters:

Name Type Description Default
client GatewayClient

Connected GatewayClient instance

required
strategy_id str

Strategy identifier for alert context

''
timeout float

RPC timeout in seconds

30.0

strategy_id property

strategy_id: str

Get the strategy ID.

send_alert async

send_alert(
    message: str,
    severity: str = "info",
    channel: str = "slack",
    metadata: dict[str, str] | None = None,
) -> GatewayAlertResult

Send an alert through the gateway.

Parameters:

Name Type Description Default
message str

Alert message text

required
severity str

Alert severity ("info", "warning", "critical")

'info'
channel str

Alert channel ("slack", "telegram")

'slack'
metadata dict[str, str] | None

Additional metadata to include

None

Returns:

Type Description
GatewayAlertResult

GatewayAlertResult with success status

log async

log(
    message: str,
    level: str = "INFO",
    context: dict[str, str] | None = None,
    logger_name: str = "",
) -> None

Send a log message through the gateway.

Parameters:

Name Type Description Default
message str

Log message text

required
level str

Log level ("DEBUG", "INFO", "WARNING", "ERROR")

'INFO'
context dict[str, str] | None

Additional context to include

None
logger_name str

Optional logger name for categorization

''

record_metric async

record_metric(
    name: str,
    value: float,
    labels: dict[str, str] | None = None,
    metric_type: str = "gauge",
) -> None

Record a metric through the gateway.

Parameters:

Name Type Description Default
name str

Metric name

required
value float

Metric value

required
labels dict[str, str] | None

Metric labels/tags

None
metric_type str

Type of metric ("gauge", "counter", "histogram")

'gauge'

Channels

SlackChannel

almanak.framework.alerting.SlackChannel

SlackChannel(
    webhook_url: str,
    dashboard_base_url: str | None = None,
    max_retries: int = 3,
    base_delay: float = 1.0,
    enable_threading: bool = True,
    thread_timeout_seconds: int = 3600,
)

Slack notification channel for sending alerts via webhooks.

This class implements Slack incoming webhooks for sending alert notifications to operators. It uses Slack Block Kit for rich formatting and handles rate limiting with exponential backoff.

Supports threading for related alerts - subsequent alerts for the same strategy will be posted as thread replies to the original alert.

Attributes:

Name Type Description
webhook_url

The Slack incoming webhook URL

dashboard_base_url

Base URL for dashboard links in messages

max_retries

Maximum number of retries for failed sends

base_delay

Base delay in seconds for exponential backoff

Initialize the Slack channel.

Parameters:

Name Type Description Default
webhook_url str

The Slack incoming webhook URL

required
dashboard_base_url str | None

Base URL for dashboard links in messages

None
max_retries int

Maximum number of retries for failed sends

3
base_delay float

Base delay in seconds for exponential backoff

1.0
enable_threading bool

Whether to enable threading for related alerts

True
thread_timeout_seconds int

How long to keep thread context (default 1 hour)

3600

clear_thread

clear_thread(strategy_id: str) -> None

Clear the thread context for a strategy.

Call this when a strategy issue is resolved to start fresh threads for future alerts.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required

clear_all_threads

clear_all_threads() -> None

Clear all thread contexts.

send_alert async

send_alert(
    card: OperatorCard, thread_ts: str | None = None
) -> SlackSendResult

Send an alert to Slack with exponential backoff retry.

This method formats the OperatorCard using Slack Block Kit and sends it to the configured webhook. It handles rate limiting with exponential backoff and logs all send attempts.

Threading support: If enable_threading is True and a thread_ts is provided (or stored from a previous alert for this strategy), the alert will be sent as a thread reply. Note that incoming webhooks don't return message timestamps, so for full threading support consider using the Slack Web API.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard containing alert information

required
thread_ts str | None

Optional thread timestamp to reply to

None

Returns:

Type Description
SlackSendResult

SlackSendResult indicating success or failure, with thread_ts if available

send_alert_sync

send_alert_sync(
    card: OperatorCard, thread_ts: str | None = None
) -> SlackSendResult

Synchronous wrapper for send_alert.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard containing alert information

required
thread_ts str | None

Optional thread timestamp to reply to

None

Returns:

Type Description
SlackSendResult

SlackSendResult indicating success or failure

set_thread_for_strategy

set_thread_for_strategy(
    strategy_id: str, thread_ts: str
) -> None

Set the thread_ts for a strategy externally.

This allows integration with the Slack Web API which returns message timestamps. After sending a message via Web API, call this method to enable subsequent alerts to be threaded.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
thread_ts str

The thread timestamp from Slack Web API

required

send_custom_message async

send_custom_message(
    strategy_id: str,
    severity: Severity,
    title: str,
    message: str,
    context: dict[str, Any] | None = None,
    thread_ts: str | None = None,
) -> SlackSendResult

Send a custom formatted message.

This method allows sending custom formatted messages that don't come from an OperatorCard. Supports threading for related messages.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
severity Severity

Alert severity level

required
title str

Alert title

required
message str

Alert message body

required
context dict[str, Any] | None

Optional additional context

None
thread_ts str | None

Optional thread timestamp to reply to

None

Returns:

Type Description
SlackSendResult

SlackSendResult indicating success or failure

TelegramChannel

almanak.framework.alerting.TelegramChannel

TelegramChannel(
    chat_id: str,
    bot_token: str,
    dashboard_base_url: str | None = None,
    max_retries: int = 3,
    base_delay: float = 1.0,
)

Telegram notification channel for sending alerts.

This class implements the Telegram Bot API for sending alert notifications to operators. It handles rate limiting with exponential backoff and formats messages with severity indicators.

Attributes:

Name Type Description
chat_id

The Telegram chat ID to send messages to

bot_token

The Telegram bot API token

dashboard_base_url

Base URL for dashboard links in messages

max_retries

Maximum number of retries for failed sends

base_delay

Base delay in seconds for exponential backoff

Initialize the Telegram channel.

Parameters:

Name Type Description Default
chat_id str

The Telegram chat ID to send messages to

required
bot_token str

The Telegram bot API token

required
dashboard_base_url str | None

Base URL for dashboard links in messages

None
max_retries int

Maximum number of retries for failed sends

3
base_delay float

Base delay in seconds for exponential backoff

1.0

api_url property

api_url: str

Get the Telegram API URL for this bot.

send_alert async

send_alert(card: OperatorCard) -> TelegramSendResult

Send an alert to Telegram with exponential backoff retry.

This method formats the OperatorCard as a Telegram message and sends it to the configured chat. It handles rate limiting with exponential backoff and logs all send attempts.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard containing alert information

required

Returns:

Type Description
TelegramSendResult

TelegramSendResult indicating success or failure

send_alert_sync

send_alert_sync(card: OperatorCard) -> TelegramSendResult

Synchronous wrapper for send_alert.

Parameters:

Name Type Description Default
card OperatorCard

The OperatorCard containing alert information

required

Returns:

Type Description
TelegramSendResult

TelegramSendResult indicating success or failure

format_custom_message

format_custom_message(
    strategy_id: str,
    severity: Severity,
    title: str,
    message: str,
    context: dict[str, Any] | None = None,
) -> str

Format a custom alert message.

This method allows sending custom formatted messages that don't come from an OperatorCard.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
severity Severity

Alert severity level

required
title str

Alert title

required
message str

Alert message body

required
context dict[str, Any] | None

Optional additional context

None

Returns:

Type Description
str

Formatted message string

send_custom_message async

send_custom_message(
    strategy_id: str,
    severity: Severity,
    title: str,
    message: str,
    context: dict[str, Any] | None = None,
) -> TelegramSendResult

Send a custom formatted message.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
severity Severity

Alert severity level

required
title str

Alert title

required
message str

Alert message body

required
context dict[str, Any] | None

Optional additional context

None

Returns:

Type Description
TelegramSendResult

TelegramSendResult indicating success or failure

Configuration

AlertConfig

almanak.framework.alerting.AlertConfig dataclass

AlertConfig(
    telegram_chat_id: str | None = None,
    slack_webhook: str | None = None,
    email: str | None = None,
    pagerduty_key: str | None = None,
    rules: list[AlertRule] = list(),
    quiet_hours: TimeRange | None = None,
    escalation_timeout_seconds: int = 900,
    dashboard_base_url: str | None = None,
    enabled: bool = True,
)

Configuration for a strategy's alerting setup.

This dataclass holds all the configuration needed to send alerts to operators via multiple channels.

Attributes:

Name Type Description
telegram_chat_id str | None

Telegram chat ID for notifications

slack_webhook str | None

Slack webhook URL for notifications

email str | None

Email address for notifications

pagerduty_key str | None

PagerDuty integration key for critical alerts

rules list[AlertRule]

List of alert rules to evaluate

quiet_hours TimeRange | None

Optional time range during which only CRITICAL alerts are sent

escalation_timeout_seconds int

Time before escalating unacknowledged alerts

dashboard_base_url str | None

Base URL for dashboard links in alerts

enabled bool

Global enable/disable for all alerting

configured_channels property

configured_channels: list[AlertChannel]

Get the list of channels that have been configured.

has_channel

has_channel(channel: AlertChannel) -> bool

Check if a specific channel is configured.

get_rules_for_condition

get_rules_for_condition(
    condition: AlertCondition,
) -> list[AlertRule]

Get all enabled rules for a specific condition.

get_rules_for_channel

get_rules_for_channel(
    channel: AlertChannel,
) -> list[AlertRule]

Get all enabled rules that include a specific channel.

is_in_quiet_hours

is_in_quiet_hours(check_time: time) -> bool

Check if the given time is within quiet hours.

should_send_alert

should_send_alert(
    severity: Severity, current_time: time
) -> bool

Determine if an alert should be sent based on severity and quiet hours.

During quiet hours, only CRITICAL alerts are sent.

Parameters:

Name Type Description Default
severity Severity

The severity of the alert

required
current_time time

The current time to check against quiet hours

required

Returns:

Type Description
bool

True if the alert should be sent

to_dict

to_dict() -> dict[str, Any]

Convert the alert config to a dictionary for serialization.

AlertRule

almanak.framework.alerting.AlertRule dataclass

AlertRule(
    condition: AlertCondition,
    threshold: Decimal,
    severity: Severity,
    channels: list[AlertChannel],
    cooldown_seconds: int = 300,
    enabled: bool = True,
    description: str = "",
    custom_message: str | None = None,
)

A rule defining when and how to send an alert.

Attributes:

Name Type Description
condition AlertCondition

The condition that triggers this alert

threshold Decimal

The threshold value for the condition (interpretation depends on condition)

severity Severity

Severity level for alerts triggered by this rule

channels list[AlertChannel]

List of channels to send alerts to

cooldown_seconds int

Minimum seconds between alerts for this rule

enabled bool

Whether this rule is active

description str

Human-readable description of the rule

custom_message str | None

Optional custom message template for the alert

__post_init__

__post_init__() -> None

Validate the alert rule.

to_dict

to_dict() -> dict[str, Any]

Convert the alert rule to a dictionary for serialization.

AlertChannel

almanak.framework.alerting.AlertChannel

Bases: str, Enum

Supported notification channels for alerts.

Escalation

EscalationPolicy

almanak.framework.alerting.EscalationPolicy

EscalationPolicy(
    config: AlertConfig,
    auto_remediation_callback: AutoRemediationCallback
    | None = None,
    emergency_pause_callback: EmergencyPauseCallback
    | None = None,
    custom_thresholds: dict[EscalationLevel, int]
    | None = None,
)

Manages escalation of unacknowledged alerts.

The EscalationPolicy tracks alerts and escalates them through multiple levels if they are not acknowledged within time thresholds.

Escalation levels: - Level 1 (<5 min): Telegram/Slack - Level 2 (<15 min): Add Email - Level 3 (<30 min): PagerDuty for HIGH+ severity - Level 4 (30+ min): Auto-remediation or emergency pause

Attributes:

Name Type Description
config

The AlertConfig for channel configuration

escalations dict[str, EscalationState]

Dict of active escalation states by alert_id

auto_remediation_callback

Optional callback for auto-remediation

emergency_pause_callback

Optional callback for emergency pause

Initialize the EscalationPolicy.

Parameters:

Name Type Description Default
config AlertConfig

AlertConfig with channel configurations

required
auto_remediation_callback AutoRemediationCallback | None

Callback to execute auto-remediation

None
emergency_pause_callback EmergencyPauseCallback | None

Callback to execute emergency pause

None
custom_thresholds dict[EscalationLevel, int] | None

Optional custom time thresholds for escalation levels

None

start_escalation

start_escalation(
    strategy_id: str,
    card: OperatorCard,
    current_time: datetime | None = None,
) -> EscalationState

Start tracking escalation for a new alert.

If an escalation already exists for this alert, returns the existing one.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
card OperatorCard

The OperatorCard that triggered the alert

required
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
EscalationState

The EscalationState for this alert

acknowledge

acknowledge(
    alert_id: str,
    acknowledged_by: str = "operator",
    current_time: datetime | None = None,
) -> bool

Acknowledge an alert and stop its escalation.

Parameters:

Name Type Description Default
alert_id str

The alert ID to acknowledge

required
acknowledged_by str

Who is acknowledging (for audit)

'operator'
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
bool

True if acknowledgment succeeded, False if alert not found

acknowledge_by_strategy

acknowledge_by_strategy(
    strategy_id: str,
    acknowledged_by: str = "operator",
    current_time: datetime | None = None,
) -> int

Acknowledge all active alerts for a strategy.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required
acknowledged_by str

Who is acknowledging

'operator'
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
int

Number of alerts acknowledged

resolve

resolve(
    alert_id: str, current_time: datetime | None = None
) -> bool

Mark an alert as resolved.

Parameters:

Name Type Description Default
alert_id str

The alert ID to resolve

required
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
bool

True if resolution succeeded, False if alert not found

check_escalation

check_escalation(
    alert_id: str, current_time: datetime | None = None
) -> EscalationResult

Check if an alert needs to be escalated.

This method checks the time elapsed since the alert was created and determines if it should be escalated to the next level.

Parameters:

Name Type Description Default
alert_id str

The alert ID to check

required
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
EscalationResult

EscalationResult indicating what action to take

process_escalation async

process_escalation(
    alert_id: str, current_time: datetime | None = None
) -> EscalationResult

Process escalation for an alert, including executing Level 4 actions.

This method checks escalation and executes auto-remediation or emergency pause if Level 4 is reached.

Parameters:

Name Type Description Default
alert_id str

The alert ID to process

required
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
EscalationResult

EscalationResult with action details

process_escalation_sync

process_escalation_sync(
    alert_id: str, current_time: datetime | None = None
) -> EscalationResult

Synchronous wrapper for process_escalation.

Parameters:

Name Type Description Default
alert_id str

The alert ID to process

required
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
EscalationResult

EscalationResult with action details

check_all_escalations

check_all_escalations(
    current_time: datetime | None = None,
) -> dict[str, EscalationResult]

Check all active escalations.

Parameters:

Name Type Description Default
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
dict[str, EscalationResult]

Dict mapping alert_id to EscalationResult

process_all_escalations async

process_all_escalations(
    current_time: datetime | None = None,
) -> dict[str, EscalationResult]

Process all active escalations.

Parameters:

Name Type Description Default
current_time datetime | None

Current time (defaults to now)

None

Returns:

Type Description
dict[str, EscalationResult]

Dict mapping alert_id to EscalationResult

get_escalation_state

get_escalation_state(
    alert_id: str,
) -> EscalationState | None

Get the current escalation state for an alert.

Parameters:

Name Type Description Default
alert_id str

The alert ID

required

Returns:

Type Description
EscalationState | None

EscalationState or None if not found

get_active_escalations

get_active_escalations() -> list[EscalationState]

Get all active escalations.

Returns:

Type Description
list[EscalationState]

List of active EscalationState objects

get_escalations_for_strategy

get_escalations_for_strategy(
    strategy_id: str,
) -> list[EscalationState]

Get all escalations for a strategy.

Parameters:

Name Type Description Default
strategy_id str

The strategy ID

required

Returns:

Type Description
list[EscalationState]

List of EscalationState objects for the strategy

clear_resolved_escalations

clear_resolved_escalations(
    max_age_seconds: int = 86400,
) -> int

Clear old resolved escalations to prevent memory buildup.

Parameters:

Name Type Description Default
max_age_seconds int

Maximum age for resolved escalations (default 24 hours)

86400

Returns:

Type Description
int

Number of escalations cleared

EscalationLevel

almanak.framework.alerting.EscalationLevel

Bases: IntEnum

Escalation levels from least to most severe.

Results

AlertSendResult

almanak.framework.alerting.AlertSendResult dataclass

AlertSendResult(
    success: bool,
    channels_sent: list[AlertChannel] = list(),
    channels_failed: list[AlertChannel] = list(),
    errors: dict[AlertChannel, str] = dict(),
    skipped_reason: str | None = None,
)

Result of sending an alert through AlertManager.