# Native modules import json import logging from datetime import datetime, timedelta import os import sys import time import difflib # Third-party modules import requests from dateutil.parser import parse import pytz from dotenv import load_dotenv from urllib.parse import urlencode from typing import List, Dict, Optional, Any # Import the SessionNotifier class from session_notifier import SessionNotifier # Import the preferred sessions from the session_config module from session_config import PREFERRED_SESSIONS load_dotenv() # Configuration USERNAME = os.environ.get("CROSSFIT_USERNAME") PASSWORD = os.environ.get("CROSSFIT_PASSWORD") if not all([USERNAME, PASSWORD]): raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD") APPLICATION_ID = "81560887" CATEGORY_ID = "677" # Activity category ID for CrossFit TIMEZONE = "Europe/Paris" # Adjust to your timezone TARGET_RESERVATION_TIME = "20:01" # When bookings open (8 PM) DEVICE_TYPE = "3" # Retry configuration RETRY_MAX = 3 RETRY_BACKOFF = 1 APP_VERSION = "5.09.21" class CrossFitBooker: def __init__(self) -> None: """ Initialize the CrossFitBooker with necessary attributes. """ self.auth_token: Optional[str] = None self.user_id: Optional[str] = None self.session: requests.Session = requests.Session() self.base_headers: Dict[str, str] = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:140.0) Gecko/20100101 Firefox/140.0", "Content-Type": "application/x-www-form-urlencoded", "Nubapp-Origin": "user_apps", } self.session.headers.update(self.base_headers) # Define mandatory parameters for API calls self.mandatory_params: Dict[str, str] = { "app_version": APP_VERSION, "device_type": DEVICE_TYPE, "id_application": APPLICATION_ID, "id_category_activity": CATEGORY_ID } # Initialize the SessionNotifier with credentials from environment variables email_credentials = { "from": os.environ.get("EMAIL_FROM"), "to": os.environ.get("EMAIL_TO"), "password": os.environ.get("EMAIL_PASSWORD") } telegram_credentials = { "token": os.environ.get("TELEGRAM_TOKEN"), "chat_id": os.environ.get("TELEGRAM_CHAT_ID") } # Get notification settings from environment variables enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes") enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes") self.notifier = SessionNotifier( email_credentials, telegram_credentials, enable_email=enable_email, enable_telegram=enable_telegram ) def get_auth_headers(self) -> Dict[str, str]: """ Return headers with authorization if available. Returns: Dict[str, str]: Headers dictionary with authorization if available. """ headers: Dict[str, str] = self.base_headers.copy() if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" return headers def login(self) -> bool: """ Authenticate and get the bearer token. Returns: bool: True if login is successful, False otherwise. """ try: # First login endpoint login_params: Dict[str, str] = { "app_version": APP_VERSION, "device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD } response: requests.Response = self.session.post( "https://sport.nubapp.com/api/v4/users/checkUser.php", headers={"Content-Type": "application/x-www-form-urlencoded"}, data=urlencode(login_params)) if not response.ok: logging.error(f"First login step failed: {response.status_code} - {response.text} - Response: {response.text[:100]}") return False try: login_data: Dict[str, Any] = response.json() self.user_id = str(login_data["data"]["user"]["id_user"]) except KeyError as ke: logging.error(f"Key error during login: {str(ke)} - Response: {response.text}") return False except ValueError as ve: logging.error(f"Value error during login: {str(ve)} - Response: {response.text}") return False # Second login endpoint response: requests.Response = self.session.post( "https://sport.nubapp.com/api/v4/login", headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"}, data=urlencode({ "device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD })) if response.ok: try: login_data: Dict[str, Any] = response.json() self.auth_token = login_data.get("token") except KeyError as ke: logging.error(f"Key error during login: {str(ke)} - Response: {response.text}") return False except ValueError as ve: logging.error(f"Value error during login: {str(ve)} - Response: {response.text}") return False if self.auth_token and self.user_id: logging.info("Successfully logged in") return True else: logging.error(f"Login failed: {response.status_code} - {response.text} - Response: {response.text[:100]}") return False except requests.exceptions.JSONDecodeError: logging.error("Failed to decode JSON response during login") return False except requests.exceptions.RequestException as e: logging.error(f"Request error during login: {str(e)}") return False except Exception as e: logging.error(f"Unexpected error during login: {str(e)}") return False def get_available_sessions(self, start_date: datetime, end_date: datetime) -> Optional[Dict[str, Any]]: """ Fetch available sessions from the API with comprehensive error handling. Args: start_date (datetime): Start date for fetching sessions. end_date (datetime): End date for fetching sessions. Returns: Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise. """ if not self.auth_token or not self.user_id: logging.error("Authentication required - missing token or user ID") return None url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php" # Prepare request with mandatory parameters request_data: Dict[str, str] = self.mandatory_params.copy() request_data.update({ "id_user": self.user_id, "start_timestamp": start_date.strftime("%d-%m-%Y"), "end_timestamp": end_date.strftime("%d-%m-%Y") }) # Add retry logic with exponential backoff and more informative error messages for retry in range(RETRY_MAX): try: try: response: requests.Response = self.session.post( url, headers=self.get_auth_headers(), data=urlencode(request_data), timeout=10 ) except requests.exceptions.Timeout: logging.error(f"Request timed out after 10 seconds for URL: {url}. Retry {retry+1}/{RETRY_MAX}") return None except requests.exceptions.ConnectionError as e: logging.error(f"Connection error for URL: {url} - Error: {str(e)}") return None except requests.exceptions.RequestException as e: logging.error(f"Request failed for URL: {url} - Error: {str(e)}") return None break # Success, exit retry loop except requests.exceptions.JSONDecodeError: logging.error("Failed to decode JSON response") return None except requests.exceptions.RequestException as e: if retry == RETRY_MAX - 1: logging.error(f"Final retry failed: {str(e)}") raise # Propagate error wait_time: int = RETRY_BACKOFF * (2 ** retry) logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...") time.sleep(wait_time) else: # All retries exhausted logging.error(f"Failed after {RETRY_MAX} attempts") return None # Handle response if response.status_code == 200: try: json_response: Dict[str, Any] = response.json() return json_response except ValueError: logging.error("Failed to decode JSON response") return None elif response.status_code == 400: logging.error("400 Bad Request - likely missing or invalid parameters") logging.error(f"Request Data: {request_data}") logging.error(f"Response: {response.text[:100]}") return None elif response.status_code == 401: logging.error("401 Unauthorized - token may be expired or invalid") logging.error(f"Response: {response.text[:100]}") return None elif 500 <= response.status_code < 600: logging.error(f"Server error {response.status_code} - Response: {response.text[:100]}") raise requests.exceptions.ConnectionError(f"Server error {response.status_code}") else: logging.error(f"Unexpected status code: {response.status_code}") return None def book_session(self, session_id: str) -> bool: """ Book a specific session with debug logging. Args: session_id (str): ID of the session to book. Returns: bool: True if booking is successful, False otherwise. """ return self._make_request( url="https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php", data=self._prepare_booking_data(session_id), success_msg=f"Successfully booked session {session_id}" ) def _prepare_booking_data(self, session_id: str) -> Dict[str, str]: """ Prepare request data for booking a session. Args: session_id (str): ID of the session to book. Returns: Dict[str, str]: Dictionary containing request data for booking a session. """ return { **self.mandatory_params, "id_activity_calendar": session_id, "id_user": self.user_id, "action_by": self.user_id, "n_guests": "0", "booked_on": "3" } def _make_request(self, url: str, data: Dict[str, str], success_msg: str) -> bool: """ Handle API requests with retry logic and response processing. Args: url (str): URL for the API request. data (Dict[str, str]): Data to send with the request. success_msg (str): Message to log on successful request. Returns: bool: True if request is successful, False otherwise. """ for retry in range(RETRY_MAX): try: response: requests.Response = self.session.post( url, headers=self.get_auth_headers(), data=urlencode(data), timeout=10 ) if response.status_code == 200: json_response: Dict[str, Any] = response.json() if json_response.get("success", False): logging.info(success_msg) return True logging.error(f"API returned success:false: {json_response}") return False logging.error(f"HTTP {response.status_code}: {response.text[:100]}") return False except requests.exceptions.JSONDecodeError: logging.error("Failed to decode JSON response") return False except requests.exceptions.RequestException as e: if retry == RETRY_MAX - 1: logging.error(f"Final retry failed: {str(e)}") raise # Propagate error wait_time: int = RETRY_BACKOFF * (2 ** retry) logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...") time.sleep(wait_time) logging.error(f"Failed to complete request after {RETRY_MAX} attempts") return False def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool: """ Check if a session is bookable based on user_info, ignoring error codes. Args: session (Dict[str, Any]): Session data. current_time (datetime): Current time for comparison. Returns: bool: True if the session is bookable, False otherwise. """ user_info: Dict[str, Any] = session.get("user_info", {}) # First check if can_join is true (primary condition) if user_info.get("can_join", False): activity_name = session.get("name_activity") logging.debug(f"Session is bookable: {activity_name} can_join is True") return True # If can_join is False, check if there's a booking window booking_date_str: str = user_info.get("unableToBookUntilDate", "") booking_time_str: str = user_info.get("unableToBookUntilTime", "") if booking_date_str and booking_time_str: try: booking_datetime: datetime = datetime.strptime( f"{booking_date_str} {booking_time_str}", "%d-%m-%Y %H:%M" ) booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime) if current_time >= booking_datetime: logging.debug(f"Session is bookable: current_time {current_time} >= booking_datetime {booking_datetime}") return True # Booking window is open else: return False # Still waiting for booking to open except ValueError: pass # Ignore invalid date formats # Default case: not bookable return False def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool: """ Check if session matches one of your preferred sessions with fuzzy matching. Args: session (Dict[str, Any]): Session data. current_time (datetime): Current time for comparison. Returns: bool: True if the session matches a preferred session, False otherwise. """ try: session_time: datetime = parse(session["start_timestamp"]) if not session_time.tzinfo: session_time = pytz.timezone(TIMEZONE).localize(session_time) day_of_week: int = session_time.weekday() session_time_str: str = session_time.strftime("%H:%M") session_name: str = session.get("name_activity", "").upper() for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS: # Exact match first if (day_of_week == preferred_day and session_time_str == preferred_time and preferred_name in session_name): return True # Fuzzy match fallback (80% similarity) ratio: float = difflib.SequenceMatcher( None, session_name.lower(), preferred_name.lower() ).ratio() if (day_of_week == preferred_day and abs(session_time.hour - int(preferred_time.split(':')[0])) <= 1 and ratio >= 0.8): logging.debug(f"Fuzzy match: {session_name} → {preferred_name} ({ratio:.2%})") return True return False except Exception as e: logging.error(f"Failed to check session: {str(e)} - Session: {session}") return False def run_booking_cycle(self, current_time: datetime) -> None: """ Run one cycle of checking and booking sessions. Args: current_time (datetime): Current time for comparison. """ # Calculate date range to check (next 3 days) start_date: date = current_time.date() end_date: date = start_date + timedelta(days=3) # Get available sessions sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date) if not sessions_data or not sessions_data.get("success", False): logging.error("No sessions available or error fetching sessions - Sessions Data: {sessions_data}") return activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", []) # Find sessions to book (preferred only) sessions_to_book: List[Tuple[str, Dict[str, Any]]] = [] upcoming_sessions: List[Dict[str, Any]] = [] found_preferred_sessions: List[Dict[str, Any]] = [] for session in activities: session_time: datetime = parse(session["start_timestamp"]) if not session_time.tzinfo: session_time = pytz.timezone(TIMEZONE).localize(session_time) # Check if session is preferred and bookable if self.is_session_bookable(session, current_time): if self.matches_preferred_session(session, current_time): sessions_to_book.append(("Preferred", session)) found_preferred_sessions.append(session) else: # Check if it's a preferred session that's not bookable yet if self.matches_preferred_session(session, current_time): found_preferred_sessions.append(session) # Check if it's available tomorrow if (session_time.date() - current_time.date()).days == 1: upcoming_sessions.append(session) if not sessions_to_book and not upcoming_sessions: logging.info("No matching sessions found to book") return # Notify about all found preferred sessions, regardless of bookability for session in found_preferred_sessions: session_time: datetime = parse(session["start_timestamp"]) if not session_time.tzinfo: session_time = pytz.timezone(TIMEZONE).localize(session_time) session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" self.notifier.notify_session_booking(session_details) logging.info(f"Notified about found preferred session: {session_details}") # Notify about upcoming sessions for session in upcoming_sessions: session_time: datetime = parse(session["start_timestamp"]) if not session_time.tzinfo: session_time = pytz.timezone(TIMEZONE).localize(session_time) session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow logging.info(f"Notified about upcoming session: {session_details}") # Book sessions (preferred first) sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1) for session_type, session in sessions_to_book: session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S") logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})") if self.book_session(session["id_activity_calendar"]): # Send notification after successful booking session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}" self.notifier.notify_session_booking(session_details) logging.info(f"Successfully booked {session_type} session at {session_time}") else: logging.error(f"Failed to book {session_type} session at {session_time} - Session: {session}") def run(self) -> None: """ Main execution loop. """ # Set up timezone tz: pytz.timezone = pytz.timezone(TIMEZONE) # Initial login if not self.login(): logging.error("Authentication failed - exiting program") return try: while True: try: current_time: datetime = datetime.now(tz) logging.info(f"Current time: {current_time}") # Always run booking cycle to check for preferred sessions and notify self.run_booking_cycle(current_time) # Run booking cycle at the target time for actual booking if current_time.strftime("%H:%M") == TARGET_RESERVATION_TIME: # Wait until the next booking window wait_until = current_time + timedelta(minutes=60) time.sleep((wait_until - current_time).total_seconds()) else: # Check again in 5 minutes time.sleep(300) except Exception as e: logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}") time.sleep(60) # Wait before retrying after error except KeyboardInterrupt: self.quit() def quit(self) -> None: """ Clean up resources and exit the script. """ logging.info("Script interrupted by user. Quitting...") # Add any cleanup code here exit(0)