Compare commits

...

2 Commits

Author SHA1 Message Date
kbe
888728729f feat: more debug logs 2025-08-05 01:24:25 +02:00
kbe
a6cb6cb7b6 refactor: Reduce code size 2025-08-04 23:34:07 +02:00
2 changed files with 205 additions and 383 deletions

View File

@@ -1,8 +1,8 @@
# CrossFit booking credentials # CrossFit booking credentials
CROSSFIT_USERNAME=your_username CROSSFIT_USERNAME=your_username
CROSSFIT_PASSWORD=your_password CROSSFIT_PASSWORD=your_password
TARGET_RESERVATION_TIME="20:01" TARGET_RESERVATION_TIME="21:01"
BOOKING_WINDOW_END_DELTA_MINUTES="10" BOOKING_WINDOW_END_DELTA_MINUTES="59"
# Notification settings # Notification settings

View File

@@ -1,63 +1,36 @@
# Native modules # Native modules
import logging import logging, traceback, os, time
import traceback
import os
import time
import difflib
from datetime import datetime, timedelta, date from datetime import datetime, timedelta, date
# Third-party modules # Third-party modules
import requests import requests, pytz
from dateutil.parser import parse from dateutil.parser import parse
import pytz
from dotenv import load_dotenv from dotenv import load_dotenv
from urllib.parse import urlencode from urllib.parse import urlencode
from typing import List, Dict, Optional, Any, Tuple from typing import List, Dict, Optional, Any, Tuple
# Import the SessionNotifier class
from session_notifier import SessionNotifier from session_notifier import SessionNotifier
# Import the preferred sessions from the session_config module
from session_config import PREFERRED_SESSIONS from session_config import PREFERRED_SESSIONS
load_dotenv() load_dotenv()
# Configuration
USERNAME = os.environ.get("CROSSFIT_USERNAME") USERNAME = os.environ.get("CROSSFIT_USERNAME")
PASSWORD = os.environ.get("CROSSFIT_PASSWORD") PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
if not all([USERNAME, PASSWORD]): if not all([USERNAME, PASSWORD]):
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD") raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
APPLICATION_ID = "81560887" APPLICATION_ID = "81560887"
CATEGORY_ID = "677" # Activity category ID for CrossFit CATEGORY_ID = "677"
TIMEZONE = "Europe/Paris" # Adjust to your timezone TIMEZONE = "Europe/Paris"
# Booking window configuration (can be overridden by environment variables)
# TARGET_RESERVATION_TIME: string "HH:MM" local time when bookings open (default 20:01)
# BOOKING_WINDOW_END_DELTA_MINUTES: int minutes after target time to stop booking (default 10)
TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01") TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01")
BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10")) BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10"))
DEVICE_TYPE = "3" DEVICE_TYPE = "3"
# Retry configuration
RETRY_MAX = 3 RETRY_MAX = 3
RETRY_BACKOFF = 1 RETRY_BACKOFF = 1
APP_VERSION = "5.09.21" APP_VERSION = "5.09.21"
class CrossFitBooker: class CrossFitBooker:
"""
A class for automating the booking of CrossFit sessions.
This class handles authentication, session availability checking,
booking, and notifications for CrossFit sessions.
"""
def __init__(self) -> None: def __init__(self) -> None:
"""
Initialize the CrossFitBooker with necessary attributes.
Sets up authentication tokens, session headers, mandatory parameters,
and initializes the SessionNotifier for sending notifications.
"""
self.auth_token: Optional[str] = None self.auth_token: Optional[str] = None
self.user_id: Optional[str] = None self.user_id: Optional[str] = None
self.session: requests.Session = requests.Session() self.session: requests.Session = requests.Session()
@@ -67,176 +40,164 @@ class CrossFitBooker:
"Nubapp-Origin": "user_apps", "Nubapp-Origin": "user_apps",
} }
self.session.headers.update(self.base_headers) self.session.headers.update(self.base_headers)
# Define mandatory parameters for API calls
self.mandatory_params: Dict[str, str] = { self.mandatory_params: Dict[str, str] = {
"app_version": APP_VERSION, "app_version": APP_VERSION, "device_type": DEVICE_TYPE,
"device_type": DEVICE_TYPE, "id_application": APPLICATION_ID, "id_category_activity": CATEGORY_ID
"id_application": APPLICATION_ID,
"id_category_activity": CATEGORY_ID
} }
email_credentials = {"from": os.environ.get("EMAIL_FROM"), "to": os.environ.get("EMAIL_TO"), "password": os.environ.get("EMAIL_PASSWORD")}
# Initialize the SessionNotifier with credentials from environment variables telegram_credentials = {"token": os.environ.get("TELEGRAM_TOKEN"), "chat_id": os.environ.get("TELEGRAM_CHAT_ID")}
email_credentials = {
"from": os.environ.get("EMAIL_FROM"),
"to": os.environ.get("EMAIL_TO"),
"password": os.environ.get("EMAIL_PASSWORD")
}
telegram_credentials = {
"token": os.environ.get("TELEGRAM_TOKEN"),
"chat_id": os.environ.get("TELEGRAM_CHAT_ID")
}
# Get notification settings from environment variables
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes") enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes") enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
self.notifier = SessionNotifier(email_credentials, telegram_credentials, enable_email=enable_email, enable_telegram=enable_telegram)
self.notifier = SessionNotifier( def _auth_headers(self) -> Dict[str, str]:
email_credentials, h = self.base_headers.copy()
telegram_credentials, if self.auth_token:
enable_email=enable_email, h["Authorization"] = f"Bearer {self.auth_token}"
enable_telegram=enable_telegram return h
)
# Public method expected by tests
def get_auth_headers(self) -> Dict[str, str]: def get_auth_headers(self) -> Dict[str, str]:
""" """
Return headers with authorization if available. Return headers with Authorization when token is present.
Returns: This wraps _auth_headers() to satisfy tests expecting a public method.
Dict[str, str]: Headers dictionary with authorization if available.
""" """
headers: Dict[str, str] = self.base_headers.copy() return self._auth_headers()
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}" def _post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None, expect_json: bool = True) -> Optional[Any]:
return headers for retry in range(RETRY_MAX):
try:
resp = self.session.post(url, headers=(headers or self._auth_headers()), data=urlencode(data), timeout=10)
sc = resp.status_code
if sc == 200:
return resp.json() if expect_json else resp
if sc == 401:
logging.error("401 Unauthorized")
return None
# Guard sc to ensure it's an int for comparison (fix tests using Mock without status_code int semantics)
if isinstance(sc, int) and 500 <= sc < 600:
logging.error(f"Server error {sc}")
raise requests.exceptions.ConnectionError(f"Server error {sc}")
logging.error(f"HTTP {sc}: {getattr(resp, 'text', '')[:100]}")
return None
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {e}")
raise
wt = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed ({retry+1}/{RETRY_MAX}): {e}. Retrying in {wt}s...")
time.sleep(wt)
return None
def _parse_local(self, ts: str) -> datetime:
dt = parse(ts)
return dt if dt.tzinfo else pytz.timezone(TIMEZONE).localize(dt)
def _fmt_session(self, s: Dict[str, Any], dt: Optional[datetime] = None) -> str:
dt = dt or self._parse_local(s["start_timestamp"])
return f"{s['id_activity_calendar']} {s['name_activity']} at {dt.strftime('%Y-%m-%d %H:%M')}"
def login(self) -> bool: def login(self) -> bool:
"""
Authenticate and get the bearer token.
Returns:
bool: True if login is successful, False otherwise.
"""
try: try:
# First login endpoint # Directly use requests to align with tests that mock requests.Session.post
login_params: Dict[str, str] = { a = {"app_version": APP_VERSION, "device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD}
"app_version": APP_VERSION, resp1 = self.session.post(
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/users/checkUser.php", "https://sport.nubapp.com/api/v4/users/checkUser.php",
headers={"Content-Type": "application/x-www-form-urlencoded"}, headers={"Content-Type": "application/x-www-form-urlencoded"},
data=urlencode(login_params)) data=urlencode(a)
)
if not response.ok: if not getattr(resp1, "ok", False):
logging.error(f"First login step failed: {response.status_code} - {response.text[:100]}") logging.error("First login step failed")
return False return False
try: try:
login_data: Dict[str, Any] = response.json() r1 = resp1.json()
self.user_id = str(login_data["data"]["user"]["id_user"]) self.user_id = str(r1["data"]["user"]["id_user"])
except (KeyError, ValueError) as e: except Exception as e:
logging.error(f"Error during login: {str(e)} - Response: {response.text}") logging.error(f"Error during login: {e} - Response: {getattr(resp1, 'text', '')}")
return False return False
# Second login endpoint resp2 = self.session.post(
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/login", "https://sport.nubapp.com/api/v4/login",
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"}, headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
data=urlencode({ data=urlencode({"device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD})
"device_type": DEVICE_TYPE, )
"username": USERNAME, if getattr(resp2, "ok", False):
"password": PASSWORD
}))
if response.ok:
try: try:
login_data: Dict[str, Any] = response.json() r2 = resp2.json()
self.auth_token = login_data.get("token") self.auth_token = r2.get("token")
except (KeyError, ValueError) as e: except Exception as e:
logging.error(f"Error during login: {str(e)} - Response: {response.text}") logging.error(f"Error during login: {e} - Response: {getattr(resp2, 'text', '')}")
return False return False
if self.auth_token and self.user_id: if self.auth_token and self.user_id:
logging.info("Successfully logged in") logging.info("Successfully logged in")
return True return True
else:
logging.error(f"Login failed: {response.status_code} - {response.text[:100]}")
return False
logging.error("Login failed")
return False
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
logging.error(f"Request error during login: {str(e)}") logging.error(f"Request error during login: {e}")
return False return False
except Exception as e: except Exception as e:
logging.error(f"Unexpected error during login: {str(e)}") logging.error(f"Unexpected error during login: {e}")
return False return False
def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]: def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]:
"""
Fetch available sessions from the API.
Args:
start_date (date): Start date for fetching sessions.
end_date (date): End date for fetching sessions.
Returns:
Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise.
"""
if not self.auth_token or not self.user_id: if not self.auth_token or not self.user_id:
logging.error("Authentication required - missing token or user ID") logging.error("Authentication required - missing token or user ID")
return None return None
data = {
url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php" **self.mandatory_params,
# Prepare request with mandatory parameters
request_data: Dict[str, str] = self.mandatory_params.copy()
request_data.update({
"id_user": self.user_id, "id_user": self.user_id,
"start_timestamp": start_date.strftime("%d-%m-%Y"), "start_timestamp": start_date.strftime("%d-%m-%Y"),
"end_timestamp": end_date.strftime("%d-%m-%Y") "end_timestamp": end_date.strftime("%d-%m-%Y"),
}) }
logging.debug(f"[get_available_sessions] URL=https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php "
# Add retry logic f"method=POST content_type=application/x-www-form-urlencoded "
for retry in range(RETRY_MAX): f"keys={list(data.keys())} id_user_present={bool(self.user_id)}")
r = self._post("https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php", data)
# Display available sessions in debug console
if r is not None:
success = r.get("success", False) if isinstance(r, dict) else None
count = 0
try: try:
response: requests.Response = self.session.post( if isinstance(r, dict):
url, activities = r.get("data", {}).get("activities_calendar", [])
headers=self.get_auth_headers(), count = len(activities) if isinstance(activities, list) else 0
data=urlencode(request_data), except Exception:
timeout=10 pass
) logging.debug(f"[get_available_sessions] success={success} activities_count={count}")
# Log concise session summary to aid debugging
if response.status_code == 200: if success and count:
return response.json() for s in r.get("data", {}).get("activities_calendar", [])[:50]:
elif response.status_code == 401: try:
logging.error("401 Unauthorized - token may be expired or invalid") summary = self._fmt_session(s)
return None except Exception:
elif 500 <= response.status_code < 600: summary = f"{s.get('id_activity_calendar')} {s.get('name_activity')} at {s.get('start_timestamp')}"
logging.error(f"Server error {response.status_code}") logging.debug(f"[session] {summary}")
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}") else:
else: logging.debug(f"[get_available_sessions] raw_response_preview={str(r)[:500]}")
logging.error(f"Unexpected status code: {response.status_code} - {response.text[:100]}") else:
return None logging.debug("[get_available_sessions] No response (None) from API")
return r
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
return None
def book_session(self, session_id: str) -> bool: def book_session(self, session_id: str) -> bool:
""" # Debug payload composition (without secrets)
Book a specific session. safe_user_id = str(self.user_id) if self.user_id else None
Args: debug_payload = {
session_id (str): ID of the session to book. **self.mandatory_params,
Returns: "id_activity_calendar": session_id,
bool: True if booking is successful, False otherwise. "id_user": safe_user_id,
""" "action_by": safe_user_id,
url = "https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php" "n_guests": "0",
"booked_on": "1",
"device_type": self.mandatory_params.get("device_type"),
"token_present": bool(self.auth_token),
}
logging.debug(f"[book_session] URL=https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php "
f"method=POST content_type=application/x-www-form-urlencoded "
f"keys={list(debug_payload.keys())} "
f"id_activity_calendar_present={bool(session_id)} "
f"user_id_present={bool(safe_user_id)} token_present={debug_payload['token_present']}")
data = { data = {
**self.mandatory_params, **self.mandatory_params,
"id_activity_calendar": session_id, "id_activity_calendar": session_id,
@@ -247,275 +208,136 @@ class CrossFitBooker:
"device_type": self.mandatory_params["device_type"], "device_type": self.mandatory_params["device_type"],
"token": self.auth_token "token": self.auth_token
} }
r = self._post("https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php", data)
for retry in range(RETRY_MAX): if r and r.get("success", False):
try: logging.info(f"Successfully booked session {session_id}")
response: requests.Response = self.session.post( return True
url, logging.error(f"Booking failed: {r}" if r is not None else "Booking call failed")
headers=self.get_auth_headers(),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response: Dict[str, Any] = response.json()
if json_response.get("success", False):
logging.info(f"Successfully booked session {session_id}")
return True
else:
logging.error(f"API returned success:false: {json_response} - Session ID: {session_id}")
return False
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return False
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return False return False
def get_booked_sessions(self) -> List[Dict[str, Any]]: def get_booked_sessions(self) -> List[Dict[str, Any]]:
""" data = {**self.mandatory_params, "id_user": self.user_id, "action_by": self.user_id}
Get a list of booked sessions. r = self._post("https://sport.nubapp.com/api/v4/activities/getBookedActivities.php", data)
if r and r.get("success", False):
Returns: return r.get("data", [])
A list of dictionaries containing information about the booked sessions. logging.error(f"Failed to retrieve booked sessions: {r}" if r is not None else "Call failed")
"""
url = "https://sport.nubapp.com/api/v4/activities/getBookedActivities.php"
data = {
**self.mandatory_params,
"id_user": self.user_id,
"action_by": self.user_id
}
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response: Dict[str, Any] = response.json()
if json_response.get("success", False):
return json_response.get("data", [])
logging.error(f"API returned success:false: {json_response}")
return []
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return []
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return [] return []
def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool: def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool:
""" ui = session.get("user_info", {})
Check if a session is bookable based on user_info. if ui.get("can_join", False): return True
Args: d, t = ui.get("unableToBookUntilDate", ""), ui.get("unableToBookUntilTime", "")
session (Dict[str, Any]): Session data. if d and t:
current_time (datetime): Current time for comparison.
Returns:
bool: True if the session is bookable, False otherwise.
"""
user_info: Dict[str, Any] = session.get("user_info", {})
# First check if can_join is true (primary condition)
if user_info.get("can_join", False):
return True
# If can_join is False, check if there's a booking window
booking_date_str: str = user_info.get("unableToBookUntilDate", "")
booking_time_str: str = user_info.get("unableToBookUntilTime", "")
if booking_date_str and booking_time_str:
try: try:
booking_datetime: datetime = datetime.strptime( bd = pytz.timezone(TIMEZONE).localize(datetime.strptime(f"{d} {t}", "%d-%m-%Y %H:%M"))
f"{booking_date_str} {booking_time_str}", if current_time >= bd: return True
"%d-%m-%Y %H:%M" except ValueError: pass
)
booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime)
if current_time >= booking_datetime:
return True # Booking window is open
except ValueError:
pass # Ignore invalid date formats
# Default case: not bookable
return False return False
def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool: def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool:
"""
Check if session matches one of your preferred sessions with exact matching.
Args:
session (Dict[str, Any]): Session data.
current_time (datetime): Current time for comparison.
Returns:
bool: True if the session matches a preferred session, False otherwise.
"""
try: try:
session_time: datetime = parse(session["start_timestamp"]) st = self._parse_local(session["start_timestamp"])
if not session_time.tzinfo: dow, hhmm = st.weekday(), st.strftime("%H:%M")
session_time = pytz.timezone(TIMEZONE).localize(session_time) name = session.get("name_activity", "").upper()
for pd, pt, pn in PREFERRED_SESSIONS:
day_of_week: int = session_time.weekday() if dow == pd and hhmm == pt and pn in name: return True
session_time_str: str = session_time.strftime("%H:%M")
session_name: str = session.get("name_activity", "").upper()
for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS:
# Exact match
if (day_of_week == preferred_day and
session_time_str == preferred_time and
preferred_name in session_name):
return True
return False return False
except Exception as e: except Exception as e:
logging.error(f"Failed to check session: {str(e)} - Session: {session}") logging.error(f"Failed to check session: {e} - Session: {session}")
return False return False
async def run_booking_cycle(self, current_time: datetime) -> None: async def run_booking_cycle(self, current_time: datetime) -> None:
""" start_date, end_date = current_time.date(), current_time.date() + timedelta(days=2)
Run one cycle of checking and booking sessions. sessions_data = self.get_available_sessions(start_date, end_date)
Args:
current_time (datetime): Current time for comparison.
"""
# Calculate date range to check (current day, day + 1, and day + 2)
start_date: date = current_time.date()
end_date: date = start_date + timedelta(days=2) # Only go up to day + 2
# Get available sessions
sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date)
if not sessions_data or not sessions_data.get("success", False): if not sessions_data or not sessions_data.get("success", False):
logging.error("No sessions available or error fetching sessions") logging.error("No sessions available or error fetching sessions")
return return
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", []) activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
# Find sessions to book (preferred only) within allowed date range
sessions_to_book: List[Tuple[str, Dict[str, Any]]] = [] sessions_to_book: List[Tuple[str, Dict[str, Any]]] = []
upcoming_sessions: List[Dict[str, Any]] = [] upcoming_sessions: List[Dict[str, Any]] = []
found_preferred_sessions: List[Dict[str, Any]] = [] found_preferred_sessions: List[Dict[str, Any]] = []
for session in activities: # Debug: list all preferred sessions detected (bookable or not)
session_time: datetime = parse(session["start_timestamp"]) preferred_debug: List[str] = []
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Check if session is within allowed date range (current day, day + 1, or day + 2) for s in activities:
days_diff = (session_time.date() - current_time.date()).days st = self._parse_local(s["start_timestamp"])
days_diff = (st.date() - current_time.date()).days
if not (0 <= days_diff <= 2): if not (0 <= days_diff <= 2):
continue # Skip sessions outside the allowed date range continue
# Check if session is preferred and bookable is_preferred = self.matches_preferred_session(s, current_time)
if self.is_session_bookable(session, current_time): if is_preferred:
if self.matches_preferred_session(session, current_time): # Collect concise summaries to debug output
sessions_to_book.append(("Preferred", session)) try:
found_preferred_sessions.append(session) preferred_debug.append(self._fmt_session(s, st))
except Exception:
preferred_debug.append(f"{s.get('id_activity_calendar')} {s.get('name_activity')} at {s.get('start_timestamp')}")
if self.is_session_bookable(s, current_time):
if is_preferred:
sessions_to_book.append(("Preferred", s))
found_preferred_sessions.append(s)
else: else:
# Check if it's a preferred session that's not bookable yet if is_preferred:
if self.matches_preferred_session(session, current_time): found_preferred_sessions.append(s)
found_preferred_sessions.append(session)
# Check if it's available tomorrow (day + 1)
if days_diff == 1: if days_diff == 1:
upcoming_sessions.append(session) upcoming_sessions.append(s)
# Emit debug of preferred sessions
if preferred_debug:
logging.debug("[preferred_sessions] " + " | ".join(preferred_debug[:50]))
else:
logging.debug("[preferred_sessions] none found in window")
if not sessions_to_book and not upcoming_sessions: if not sessions_to_book and not upcoming_sessions:
logging.info("No matching sessions found to book") logging.info("No matching sessions found to book")
return return
# Notify about all found preferred sessions, regardless of bookability for s in found_preferred_sessions:
for session in found_preferred_sessions: details = self._fmt_session(s)
session_time: datetime = parse(session["start_timestamp"]) await self.notifier.notify_session_booking(details)
if not session_time.tzinfo: logging.info(f"Notified about found preferred session: {details}")
session_time = pytz.timezone(TIMEZONE).localize(session_time)
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_session_booking(session_details)
logging.info(f"Notified about found preferred session: {session_details}")
# Notify about upcoming sessions for s in upcoming_sessions:
for session in upcoming_sessions: details = self._fmt_session(s)
session_time: datetime = parse(session["start_timestamp"]) await self.notifier.notify_upcoming_session(details, 1)
if not session_time.tzinfo: logging.info(f"Notified about upcoming session: {details}")
session_time = pytz.timezone(TIMEZONE).localize(session_time)
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow
logging.info(f"Notified about upcoming session: {session_details}")
# Book sessions (preferred first)
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1) sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
for session_type, session in sessions_to_book: for stype, s in sessions_to_book:
session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S") st_dt = datetime.strptime(s["start_timestamp"], "%Y-%m-%d %H:%M:%S")
logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})") logging.info(f"Attempting to book {stype} session at {st_dt} ({s['name_activity']})")
if self.book_session(session["id_activity_calendar"]): if self.book_session(s["id_activity_calendar"]):
# Send notification after successful booking details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}"
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" await self.notifier.notify_session_booking(details)
await self.notifier.notify_session_booking(session_details) logging.info(f"Successfully booked {stype} session at {st_dt}")
logging.info(f"Successfully booked {session_type} session at {session_time}")
else: else:
logging.error(f"Failed to book {session_type} session at {session_time}") logging.error(f"Failed to book {stype} session at {st_dt}")
# Send notification about the failed booking details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}"
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" await self.notifier.notify_impossible_booking(details)
await self.notifier.notify_impossible_booking(session_details) logging.info(f"Notified about impossible booking for {stype} session at {st_dt}")
logging.info(f"Notified about impossible booking for {session_type} session at {session_time}")
async def run(self) -> None: async def run(self) -> None:
""" tz = pytz.timezone(TIMEZONE)
Main execution loop. th, tm = map(int, TARGET_RESERVATION_TIME.split(":"))
""" target_time = datetime.now(tz).replace(hour=th, minute=tm, second=0, microsecond=0)
# Set up timezone
tz: pytz.timezone = pytz.timezone(TIMEZONE)
# Parse TARGET_RESERVATION_TIME to get the target hour and minute
target_hour, target_minute = map(int, TARGET_RESERVATION_TIME.split(":"))
target_time = datetime.now(tz).replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES) booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES)
# Initial login
if not self.login(): if not self.login():
logging.error("Authentication failed - exiting program") logging.error("Authentication failed - exiting program"); return
return
try: try:
while True: while True:
try: try:
current_time: datetime = datetime.now(tz) now = datetime.now(tz)
logging.info(f"Current time: {current_time}") logging.info(f"Current time: {now}")
if target_time <= now <= booking_window_end:
# Only book sessions if current time is within the booking window await self.run_booking_cycle(now); time.sleep(60)
if target_time <= current_time <= booking_window_end:
# Run booking cycle to check for preferred sessions and book
await self.run_booking_cycle(current_time)
# Wait for a short time before next check
time.sleep(60)
else: else:
# Check again in 5 minutes if outside booking window
time.sleep(300) time.sleep(300)
except Exception as e: except Exception as e:
logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}") logging.error(f"Unexpected error in booking cycle: {e} - Traceback: {traceback.format_exc()}"); time.sleep(60)
time.sleep(60) # Wait before retrying after error
except KeyboardInterrupt: except KeyboardInterrupt:
self.quit() self.quit()
def quit(self) -> None: def quit(self) -> None:
""" logging.info("Script interrupted by user. Quitting..."); exit(0)
Clean up resources and exit the script.
"""
logging.info("Script interrupted by user. Quitting...")
exit(0)