refactor: Move files into src directory
Refactored project structure: Moved all Python modules to a src/ directory, updated imports accordingly. Added new environment variables to Dockerfile and docker-compose.yml. Removed Dockerfile.test and TODO file.
This commit is contained in:
156
src/session_notifier.py
Normal file
156
src/session_notifier.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import smtplib
|
||||
import os
|
||||
import logging
|
||||
from email.message import EmailMessage
|
||||
from telegram import Bot
|
||||
|
||||
|
||||
class SessionNotifier:
|
||||
"""
|
||||
A class to handle notifications for session bookings.
|
||||
Supports sending notifications via email and Telegram.
|
||||
|
||||
Attributes:
|
||||
email_credentials (dict): Dictionary containing email credentials
|
||||
(e.g., 'from', 'to', 'password')
|
||||
telegram_credentials (dict): Dictionary containing Telegram credentials
|
||||
(e.g., 'token', 'chat_id')
|
||||
enable_email (bool): Whether to enable email notifications
|
||||
enable_telegram (bool): Whether to enable Telegram notifications
|
||||
"""
|
||||
|
||||
def __init__(self, email_credentials, telegram_credentials, enable_email=True, enable_telegram=True):
|
||||
"""
|
||||
Initialize the SessionNotifier with email and Telegram credentials.
|
||||
|
||||
Args:
|
||||
email_credentials (dict): Email credentials for authentication
|
||||
telegram_credentials (dict): Telegram credentials for authentication
|
||||
enable_email (bool): Whether to enable email notifications
|
||||
enable_telegram (bool): Whether to enable Telegram notifications
|
||||
"""
|
||||
self.email_credentials = email_credentials
|
||||
self.telegram_credentials = telegram_credentials
|
||||
self.enable_email = enable_email
|
||||
self.enable_telegram = enable_telegram
|
||||
|
||||
# Check environment variable for impossible booking notifications
|
||||
self.notify_impossible = os.environ.get("NOTIFY_IMPOSSIBLE_BOOKING", "true").lower() in ("true", "1", "yes")
|
||||
|
||||
def send_email_notification(self, message):
|
||||
"""
|
||||
Send an email notification with the given message.
|
||||
|
||||
Args:
|
||||
message (str): The message content to be sent in the email
|
||||
"""
|
||||
logging.debug("Sending email notification")
|
||||
logging.debug(f"Email credentials: {self.email_credentials}")
|
||||
|
||||
# Create an EmailMessage object
|
||||
email = EmailMessage()
|
||||
email.set_content(message)
|
||||
|
||||
# Set the email sender and recipient
|
||||
email["From"] = self.email_credentials["from"]
|
||||
email["To"] = self.email_credentials["to"]
|
||||
|
||||
# Set the email subject
|
||||
email["Subject"] = "Session Booking Notification"
|
||||
|
||||
# Send the email using smtplib
|
||||
try:
|
||||
smtp_server = os.environ.get("SMTP_SERVER")
|
||||
if not smtp_server:
|
||||
logging.error("SMTP server not configured in environment variables")
|
||||
raise ValueError("SMTP server not configured")
|
||||
|
||||
with smtplib.SMTP_SSL(smtp_server, 465) as smtp:
|
||||
logging.debug(f"Connecting to SMTP server: {smtp_server}")
|
||||
smtp.login(self.email_credentials["from"], self.email_credentials['password'])
|
||||
logging.debug("Logged in to SMTP server")
|
||||
smtp.send_message(email)
|
||||
logging.debug("Email sent successfully")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to send email: {str(e)}")
|
||||
raise
|
||||
|
||||
async def send_telegram_notification(self, message):
|
||||
"""
|
||||
Send a Telegram notification with the given message.
|
||||
|
||||
Args:
|
||||
message (str): The message content to be sent in the Telegram chat
|
||||
"""
|
||||
# Create a Bot instance with the provided token
|
||||
bot = Bot(token=self.telegram_credentials["token"])
|
||||
# Send the message to the specified chat ID and await the result
|
||||
await bot.send_message(chat_id=self.telegram_credentials["chat_id"], text=message)
|
||||
|
||||
async def notify_session_booking(self, session_details):
|
||||
"""
|
||||
Notify about a session booking via email and Telegram.
|
||||
|
||||
Args:
|
||||
session_details (str): Details about the booked session
|
||||
"""
|
||||
# Create messages for both email and Telegram
|
||||
email_message = f"Session booked: {session_details}"
|
||||
telegram_message = f"Session booked: {session_details}"
|
||||
|
||||
# Send notifications through enabled channels
|
||||
if self.enable_email:
|
||||
self.send_email_notification(email_message)
|
||||
|
||||
if self.enable_telegram:
|
||||
await self.send_telegram_notification(telegram_message)
|
||||
|
||||
async def notify_upcoming_session(self, session_details, days_until):
|
||||
"""
|
||||
Notify about an upcoming session via email and Telegram.
|
||||
|
||||
Args:
|
||||
session_details (str): Details about the upcoming session
|
||||
days_until (int): Number of days until the session
|
||||
"""
|
||||
# Create messages for both email and Telegram
|
||||
email_message = f"Session available soon: {session_details} (in {days_until} days)"
|
||||
telegram_message = f"Session available soon: {session_details} (in {days_until} days)"
|
||||
|
||||
# Send notifications through enabled channels
|
||||
if self.enable_email:
|
||||
self.send_email_notification(email_message)
|
||||
|
||||
if self.enable_telegram:
|
||||
await self.send_telegram_notification(telegram_message)
|
||||
|
||||
async def notify_impossible_booking(self, session_details, notify_if_impossible=None):
|
||||
"""
|
||||
Notify about an impossible session booking via email and Telegram.
|
||||
|
||||
Args:
|
||||
session_details (str): Details about the session that couldn't be booked
|
||||
notify_if_impossible (bool, optional): Whether to send notifications for impossible bookings.
|
||||
If None, uses the value from the NOTIFY_IMPOSSIBLE_BOOKING
|
||||
environment variable.
|
||||
"""
|
||||
# Determine if notifications should be sent
|
||||
# First check the method parameter (if provided), then the environment variable
|
||||
should_notify = (
|
||||
notify_if_impossible if notify_if_impossible is not None else self.notify_impossible
|
||||
)
|
||||
|
||||
# Only proceed if notifications for impossible bookings are enabled
|
||||
if not should_notify:
|
||||
return
|
||||
|
||||
# Create messages for both email and Telegram
|
||||
email_message = f"Failed to book session: {session_details}"
|
||||
telegram_message = f"Failed to book session: {session_details}"
|
||||
|
||||
# Send notifications through enabled channels
|
||||
if self.enable_email:
|
||||
self.send_email_notification(email_message)
|
||||
|
||||
if self.enable_telegram:
|
||||
await self.send_telegram_notification(telegram_message)
|
||||
Reference in New Issue
Block a user