feat: Add a Telegram notification tester
This commit is contained in:
@@ -29,10 +29,10 @@ class SessionNotifier:
|
||||
enable_email (bool): Whether to enable email notifications
|
||||
enable_telegram (bool): Whether to enable Telegram notifications
|
||||
"""
|
||||
self.email_credentials = email_credentials
|
||||
self.telegram_credentials = telegram_credentials
|
||||
self.enable_email = enable_email
|
||||
self.enable_telegram = enable_telegram
|
||||
self.email_credentials = email_credentials
|
||||
self.telegram_credentials = telegram_credentials
|
||||
self.enable_email = enable_email
|
||||
self.enable_telegram = enable_telegram
|
||||
|
||||
def send_email_notification(self, message):
|
||||
"""
|
||||
@@ -49,11 +49,11 @@ class SessionNotifier:
|
||||
email.set_content(message)
|
||||
|
||||
# Set the email sender and recipient
|
||||
email['From'] = self.email_credentials['from']
|
||||
email['To'] = self.email_credentials['to']
|
||||
email["From"] = self.email_credentials["from"]
|
||||
email["To"] = self.email_credentials["to"]
|
||||
|
||||
# Set the email subject
|
||||
email['Subject'] = 'Session Booking Notification'
|
||||
email["Subject"] = "Session Booking Notification"
|
||||
|
||||
# Send the email using smtplib
|
||||
try:
|
||||
@@ -64,7 +64,7 @@ class SessionNotifier:
|
||||
|
||||
with smtplib.SMTP_SSL(smtp_server, 465) as smtp:
|
||||
logging.debug(f"Connecting to SMTP server: {smtp_server}")
|
||||
smtp.login(self.email_credentials['from'], self.email_credentials['password'])
|
||||
smtp.login(self.email_credentials["from"], self.email_credentials['password'])
|
||||
logging.debug("Logged in to SMTP server")
|
||||
smtp.send_message(email)
|
||||
logging.debug("Email sent successfully")
|
||||
@@ -72,7 +72,7 @@ class SessionNotifier:
|
||||
logging.error(f"Failed to send email: {str(e)}")
|
||||
raise
|
||||
|
||||
def send_telegram_notification(self, message):
|
||||
async def send_telegram_notification(self, message):
|
||||
"""
|
||||
Send a Telegram notification with the given message.
|
||||
|
||||
@@ -80,11 +80,11 @@ class SessionNotifier:
|
||||
message (str): The message content to be sent in the Telegram chat
|
||||
"""
|
||||
# Create a Bot instance with the provided token
|
||||
bot = Bot(token=self.telegram_credentials['token'], base_url=self.telegram_credentials.get('base_url', 'https://api.telegram.org'))
|
||||
# Send the message to the specified chat ID
|
||||
bot.send_message(chat_id=self.telegram_credentials['chat_id'], text=message)
|
||||
bot = Bot(token=self.telegram_credentials["token"])
|
||||
# Send the message to the specified chat ID and await the result
|
||||
await bot.send_message(chat_id=self.telegram_credentials["chat_id"], text=message)
|
||||
|
||||
def notify_session_booking(self, session_details):
|
||||
async def notify_session_booking(self, session_details):
|
||||
"""
|
||||
Notify about a session booking via email and Telegram.
|
||||
|
||||
@@ -100,9 +100,9 @@ class SessionNotifier:
|
||||
self.send_email_notification(email_message)
|
||||
|
||||
if self.enable_telegram:
|
||||
self.send_telegram_notification(telegram_message)
|
||||
await self.send_telegram_notification(telegram_message)
|
||||
|
||||
def notify_upcoming_session(self, session_details, days_until):
|
||||
async def notify_upcoming_session(self, session_details, days_until):
|
||||
"""
|
||||
Notify about an upcoming session via email and Telegram.
|
||||
|
||||
@@ -119,4 +119,4 @@ class SessionNotifier:
|
||||
self.send_email_notification(email_message)
|
||||
|
||||
if self.enable_telegram:
|
||||
self.send_telegram_notification(telegram_message)
|
||||
await self.send_telegram_notification(telegram_message)
|
||||
Reference in New Issue
Block a user