Source code for fenn.notification.notifier
from typing import Iterable, List, Type
from .service import Service
[docs]
class Notifier:
"""Central notification manager that handles multiple notification services."""
[docs]
def __init__(self):
"""Initialize the notifier with an empty list of services."""
self._services: List[Service] = []
[docs]
def add_services(
self,
services: Iterable[Type[Service]],
) -> None:
"""
Add a list of notification services.
Example:
app.register_notification_services([Discord, Telegram])
"""
for service_cls in services:
self.add_service(service_cls)
[docs]
def add_service(self, service: Type[Service]) -> None:
"""Add a notification service.
Args:
service: A service implementing the Service interface.
"""
self._services.append(service())
[docs]
def remove_service(self, service: Type[Service]) -> None:
"""Remove a notification service.
Args:
service: The service to remove.
Raises:
ValueError: If the service is not found.
"""
try:
self._services.remove(service())
except ValueError:
ValueError(
f"Service {service.__class__.__name__} not found in services list"
)
raise
[docs]
def notify(self, message: str) -> None:
"""Send notification to all registered services.
Args:
message: The message to send.
"""
if not self._services:
return
successful_services = []
failed_services = []
for service in self._services:
try:
service.send_notification(message)
successful_services.append(service.__class__.__name__)
# logger.info(f"Successfully sent notification via {service.__class__.__name__}")
except Exception as e:
failed_services.append((service.__class__.__name__, str(e)))
# logger.error(f"Failed to send notification via {service.__class__.__name__}: {e}")
[docs]
def get_services(self) -> List[str]:
"""Get list of registered service names.
Returns:
List of service class names.
"""
return [service.__class__.__name__ for service in self._services]
[docs]
def clear_services(self) -> None:
"""Remove all registered services."""
self._services.clear()