Source code for fenn.notification.services.telegram

from typing import Literal

import requests

from fenn.notification.service import Service


[docs] class Telegram(Service): """Telegram notification service using webhooks."""
[docs] def __init__( self, bot_token: str, chat_id: str, parse_mode: Literal["Markdown", "HTML"] | None = None, ): """Initialize Telegram service. Args: bot_token: Telegram bot token. If None, reads from TELEGRAM_BOT_TOKEN env var. chat_id: Telegram chat ID. If None, reads from TELEGRAM_CHAT_ID env var. Raises: ValueError: If webhook URL is not provided or found in environment. """ super().__init__() self._parse_mode = parse_mode self._telegram_api_url = f"https://api.telegram.org/bot{bot_token}/sendMessage" self._chat_id = chat_id
[docs] def send_notification(self, message: str) -> None: """Send notification to Telegram chat. Args: message: The message to send. Raises: requests.exceptions.RequestException: If the request fails. """ data = { "chat_id": self._chat_id, "text": message, "disable_notification": False, } if self._parse_mode: data["parse_mode"] = self._parse_mode try: result = requests.post(self._telegram_api_url, json=data, timeout=10) result.raise_for_status() except requests.exceptions.RequestException as err: raise requests.exceptions.RequestException( f"Failed to send Telegram notification: {err}" )