Compare commits

..

2 Commits

Author SHA1 Message Date
kbe
944421c68b refactor: Booking preferred sessions works
I modified the code to book only preferred sessions.
First it displays available sessions in console INFO.
Then if there is sessions that matches preferred ones,
it tries to book them and the notify bout it.
2025-08-12 00:20:21 +02:00
kbe
7161a11905 feat: Scripts display preferred sessions during execution 2025-08-12 00:09:56 +02:00
5 changed files with 126 additions and 781 deletions

View File

@@ -339,23 +339,6 @@ class CrossFitBooker:
if user_info.get("can_join", False):
return True
# If can_join is False, check if there's a booking window
booking_date_str: str = user_info.get("unableToBookUntilDate", "")
booking_time_str: str = user_info.get("unableToBookUntilTime", "")
if booking_date_str and booking_time_str:
try:
booking_datetime: datetime = datetime.strptime(
f"{booking_date_str} {booking_time_str}",
"%d-%m-%Y %H:%M"
)
booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime)
if current_time >= booking_datetime:
return True # Booking window is open
except ValueError:
pass # Ignore invalid date formats
# Default case: not bookable
return False
@@ -390,7 +373,37 @@ class CrossFitBooker:
logging.error(f"Failed to check session: {str(e)} - Session: {session}")
return False
async def run_booking_cycle(self, current_time: datetime) -> None:
def display_upcoming_sessions(self, sessions: List[Dict[str, Any]], current_time: datetime) -> None:
"""
Display upcoming sessions with ID, name, date, and time.
Args:
sessions (List[Dict[str, Any]]): List of session data.
current_time (datetime): Current time for comparison.
"""
if not sessions:
logging.info("No sessions to display")
return
logging.info("Upcoming sessions:")
logging.info("ID\t\tName\t\tDate\t\tTime")
logging.info("="*50)
for session in sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Format session details
session_id = session.get("id_activity_calendar", "N/A")
session_name = session.get("name_activity", "N/A")
session_date = session_time.strftime("%Y-%m-%d")
session_time_str = session_time.strftime("%H:%M")
# Display session details
logging.info(f"{session_id}\t{session_name}\t{session_date}\t{session_time_str}")
async def booker(self, current_time: datetime) -> None:
"""
Run one cycle of checking and booking sessions.
Args:
@@ -408,9 +421,10 @@ class CrossFitBooker:
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
# Display all available sessions within the date range
self.display_upcoming_sessions(activities, current_time)
# Find sessions to book (preferred only) within allowed date range
sessions_to_book: List[Tuple[str, Dict[str, Any]]] = []
upcoming_sessions: List[Dict[str, Any]] = []
found_preferred_sessions: List[Dict[str, Any]] = []
for session in activities:
@@ -426,55 +440,59 @@ class CrossFitBooker:
# Check if session is preferred and bookable
if self.is_session_bookable(session, current_time):
if self.matches_preferred_session(session, current_time):
sessions_to_book.append(("Preferred", session))
found_preferred_sessions.append(session)
else:
# Check if it's a preferred session that's not bookable yet
if self.matches_preferred_session(session, current_time):
found_preferred_sessions.append(session)
# Check if it's available tomorrow (day + 1)
if days_diff == 1:
upcoming_sessions.append(session)
if not sessions_to_book and not upcoming_sessions:
# Display preferred sessions found
if found_preferred_sessions:
logging.info("Preferred sessions found:")
for session in found_preferred_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
logging.info(f"ID: {session['id_activity_calendar']}, Name: {session['name_activity']}, Time: {session_time.strftime('%Y-%m-%d %H:%M')}")
else:
logging.info("No matching preferred sessions found")
# Book preferred sessions
if not found_preferred_sessions:
logging.info("No matching sessions found to book")
return
# Notify about all found preferred sessions, regardless of bookability
for session in found_preferred_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_session_booking(session_details)
logging.info(f"Notified about found preferred session: {session_details}")
# Notify about upcoming sessions
for session in upcoming_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow
logging.info(f"Notified about upcoming session: {session_details}")
# Book sessions (preferred first)
sessions_to_book = [("Preferred", session) for session in found_preferred_sessions]
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
booked_sessions = []
for session_type, session in sessions_to_book:
session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S")
session_time: datetime = parse(session["start_timestamp"])
logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
if self.book_session(session["id_activity_calendar"]):
# Send notification after successful booking
# Display booked session
booked_sessions.append(session)
logging.info(f"Successfully booked {session_type} session at {session_time}")
# Notify about booked session
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_session_booking(session_details)
logging.info(f"Successfully booked {session_type} session at {session_time}")
else:
logging.error(f"Failed to book {session_type} session at {session_time}")
# Send notification about the failed booking
# Notify about failed booking
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
await self.notifier.notify_impossible_booking(session_details)
logging.info(f"Notified about impossible booking for {session_type} session at {session_time}")
# Display all booked session(s)
if booked_sessions:
logging.info("Booked sessions:")
for session in booked_sessions:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
logging.info(f"ID: {session['id_activity_calendar']}, Name: {session['name_activity']}, Time: {session_time.strftime('%Y-%m-%d %H:%M')}")
else:
logging.info("No sessions were booked")
async def run(self) -> None:
"""
Main execution loop.
@@ -501,7 +519,7 @@ class CrossFitBooker:
# Only book sessions if current time is within the booking window
if target_time <= current_time <= booking_window_end:
# Run booking cycle to check for preferred sessions and book
await self.run_booking_cycle(current_time)
await self.booker(current_time)
# Wait for a short time before next check
time.sleep(60)
else:

View File

@@ -1,728 +0,0 @@
# Native modules
import logging
import traceback
import os
import time
import difflib
from datetime import datetime, timedelta, date
from typing import List, Dict, Optional, Any, Tuple
# Third-party modules
import requests
from dateutil.parser import parse
import pytz
from dotenv import load_dotenv
from urllib.parse import urlencode
# Import the SessionNotifier class
from session_notifier import SessionNotifier
# Import the preferred sessions from the session_config module
from session_config import PREFERRED_SESSIONS
load_dotenv()
# Configuration
USERNAME = os.environ.get("CROSSFIT_USERNAME")
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
if not all([USERNAME, PASSWORD]):
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
APPLICATION_ID = "81560887"
CATEGORY_ID = "677" # Activity category ID for CrossFit
TIMEZONE = "Europe/Paris" # Adjust to your timezone
TARGET_RESERVATION_TIME = "20:01" # When bookings open (8 PM)
DEVICE_TYPE = "3" # Crossfit Louvre 3
# Retry configuration
RETRY_MAX = 3
RETRY_BACKOFF = 1
APP_VERSION = "5.09.21"
# Pure functions for data processing
def get_auth_headers(base_headers: Dict[str, str], auth_token: Optional[str]) -> Dict[str, str]:
"""
Return headers with authorization if available.
Args:
base_headers (Dict[str, str]): Base headers dictionary
auth_token (Optional[str]): Authorization token if available
Returns:
Dict[str, str]: Headers dictionary with authorization if available.
"""
headers: Dict[str, str] = base_headers.copy()
if auth_token:
headers["Authorization"] = f"Bearer {auth_token}"
return headers
def is_session_bookable(session: Dict[str, Any], current_time: datetime, timezone: str) -> bool:
"""
Check if a session is bookable based on user_info, ignoring error codes.
Args:
session (Dict[str, Any]): Session data.
current_time (datetime): Current time for comparison.
timezone (str): Timezone string for localization.
Returns:
bool: True if the session is bookable, False otherwise.
"""
user_info: Dict[str, Any] = session.get("user_info", {})
# First check if can_join is true (primary condition)
if user_info.get("can_join", False):
return True
# If can_join is False, check if there's a booking window
booking_date_str: str = user_info.get("unableToBookUntilDate", "")
booking_time_str: str = user_info.get("unableToBookUntilTime", "")
if booking_date_str and booking_time_str:
try:
booking_datetime: datetime = datetime.strptime(
f"{booking_date_str} {booking_time_str}",
"%d-%m-%Y %H:%M"
)
booking_datetime = pytz.timezone(timezone).localize(booking_datetime)
if current_time >= booking_datetime:
return True # Booking window is open
else:
return False # Still waiting for booking to open
except ValueError:
pass # Ignore invalid date formats
# Default case: not bookable
return False
def matches_preferred_session(session: Dict[str, Any], preferred_sessions: List[Tuple[int, str, str]],
timezone: str) -> bool:
"""
Check if session matches one of your preferred sessions with fuzzy matching.
Args:
session (Dict[str, Any]): Session data.
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions.
timezone (str): Timezone string for localization.
Returns:
bool: True if the session matches a preferred session, False otherwise.
"""
try:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(timezone).localize(session_time)
day_of_week: int = session_time.weekday()
session_time_str: str = session_time.strftime("%H:%M")
session_name: str = session.get("name_activity", "").upper()
for preferred_day, preferred_time, preferred_name in preferred_sessions:
# Exact match first
if (day_of_week == preferred_day and
session_time_str == preferred_time and
preferred_name in session_name):
return True
# Fuzzy match fallback (80% similarity)
ratio: float = difflib.SequenceMatcher(
None,
session_name.lower(),
preferred_name.lower()
).ratio()
if (day_of_week == preferred_day and
abs(session_time.hour - int(preferred_time.split(':')[0])) <= 1 and
ratio >= 0.8):
logging.debug(f"Fuzzy match: {session_name}{preferred_name} ({ratio:.2%})")
return True
return False
except Exception as e:
logging.error(f"Failed to check session: {str(e)} - Session: {session}")
return False
def prepare_booking_data(mandatory_params: Dict[str, str], session_id: str, user_id: str) -> Dict[str, str]:
"""
Prepare request data for booking a session.
Args:
mandatory_params (Dict[str, str]): Mandatory parameters for API calls
session_id (str): ID of the session to book
user_id (str): User ID for the booking
Returns:
Dict[str, str]: Dictionary containing request data for booking a session.
"""
return {
**mandatory_params,
"id_activity_calendar": session_id,
"id_user": user_id,
"action_by": user_id,
"n_guests": "0",
"booked_on": "3" # Target CrossFit Louvre 3 ?
}
def is_bookable_and_preferred(session: Dict[str, Any], current_time: datetime,
preferred_sessions: List[Tuple[int, str, str]], timezone: str) -> bool:
"""
Check if a session is both bookable and matches preferred sessions.
Args:
session (Dict[str, Any]): Session data
current_time (datetime): Current time for comparison
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions
timezone (str): Timezone string for localization
Returns:
bool: True if session is bookable and preferred, False otherwise
"""
return (is_session_bookable(session, current_time, timezone) and
matches_preferred_session(session, preferred_sessions, timezone))
def filter_bookable_sessions(sessions: List[Dict[str, Any]], current_time: datetime,
preferred_sessions: List[Tuple[int, str, str]], timezone: str) -> List[Dict[str, Any]]:
"""
Filter sessions to find those that are both bookable and match preferred sessions.
Args:
sessions (List[Dict[str, Any]]): List of session data
current_time (datetime): Current time for comparison
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions
timezone (str): Timezone string for localization
Returns:
List[Dict[str, Any]]: List of sessions that are bookable and match preferences
"""
return list(filter(
lambda session: is_bookable_and_preferred(session, current_time, preferred_sessions, timezone),
sessions
))
def is_upcoming_preferred(session: Dict[str, Any], current_time: datetime,
preferred_sessions: List[Tuple[int, str, str]], timezone: str) -> bool:
"""
Check if a session is an upcoming preferred session.
Args:
session (Dict[str, Any]): Session data
current_time (datetime): Current time for comparison
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions
timezone (str): Timezone string for localization
Returns:
bool: True if session is an upcoming preferred session, False otherwise
"""
try:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(timezone).localize(session_time)
# Calculate the difference in days between session date and current date
days_diff = (session_time.date() - current_time.date()).days
# Check if session is within allowed date range (current day, day + 1, or day + 2)
is_in_range = 0 <= days_diff <= 2
# Check if it's a preferred session
is_preferred = matches_preferred_session(session, preferred_sessions, timezone)
# Only consider sessions that are tomorrow or later as upcoming
is_upcoming = days_diff > 0
# For the test case, we only need to check if it's tomorrow
if days_diff == 1:
return True
return is_in_range and is_preferred and is_upcoming
except Exception:
return False
def filter_upcoming_sessions(sessions: List[Dict[str, Any]], current_time: datetime,
preferred_sessions: List[Tuple[int, str, str]], timezone: str) -> List[Dict[str, Any]]:
"""
Filter sessions to find upcoming preferred sessions.
Args:
sessions (List[Dict[str, Any]]): List of session data
current_time (datetime): Current time for comparison
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions
timezone (str): Timezone string for localization
Returns:
List[Dict[str, Any]]: List of upcoming preferred sessions
"""
return list(filter(
lambda session: is_upcoming_preferred(session, current_time, preferred_sessions, timezone),
sessions
))
def filter_preferred_sessions(sessions: List[Dict[str, Any]],
preferred_sessions: List[Tuple[int, str, str]],
timezone: str) -> List[Dict[str, Any]]:
"""
Filter sessions to find all preferred sessions.
Args:
sessions (List[Dict[str, Any]]): List of session data
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions
timezone (str): Timezone string for localization
Returns:
List[Dict[str, Any]]: List of preferred sessions
"""
return list(filter(
lambda session: matches_preferred_session(session, preferred_sessions, timezone),
sessions
))
def format_session_details(session: Dict[str, Any], timezone: str) -> str:
"""
Format session details for notifications.
Args:
session (Dict[str, Any]): Session data
timezone (str): Timezone string for localization
Returns:
str: Formatted session details
"""
try:
session_time: datetime = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(timezone).localize(session_time)
return f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
except Exception:
return f"{session.get('name_activity', 'Unknown session')} at Unknown time"
def categorize_sessions(activities: List[Dict[str, Any]], current_time: datetime,
preferred_sessions: List[Tuple[int, str, str]], timezone: str) -> Dict[str, List[Dict[str, Any]]]:
"""
Categorize sessions into bookable, upcoming, and all preferred sessions.
Args:
activities (List[Dict[str, Any]]): List of session data
current_time (datetime): Current time for comparison
preferred_sessions (List[Tuple[int, str, str]]): List of preferred sessions
timezone (str): Timezone string for localization
Returns:
Dict[str, List[Dict[str, Any]]]: Dictionary with categorized sessions
"""
return {
"bookable": filter_bookable_sessions(activities, current_time, preferred_sessions, timezone),
"upcoming": filter_upcoming_sessions(activities, current_time, preferred_sessions, timezone),
"all_preferred": filter_preferred_sessions(activities, preferred_sessions, timezone)
}
def process_booking_results(session: Dict[str, Any], booking_success: bool, timezone: str) -> Dict[str, Any]:
"""
Process the results of a booking attempt.
Args:
session (Dict[str, Any]): Session data
booking_success (bool): Whether the booking was successful
timezone (str): Timezone string for localization
Returns:
Dict[str, Any]: Dictionary with session and booking result information
"""
return {
"session": session,
"success": booking_success,
"details": format_session_details(session, timezone)
}
class CrossFitBooker:
"""
A class for automating the booking of CrossFit sessions.
This class handles authentication, session availability checking,
booking, and notifications for CrossFit sessions.
"""
def __init__(self) -> None:
"""
Initialize the CrossFitBooker with necessary attributes.
Sets up authentication tokens, session headers, mandatory parameters,
and initializes the SessionNotifier for sending notifications.
"""
self.auth_token: Optional[str] = None
self.user_id: Optional[str] = None
self.session: requests.Session = requests.Session()
self.base_headers: Dict[str, str] = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:140.0) Gecko/20100101 Firefox/140.0",
"Content-Type": "application/x-www-form-urlencoded",
"Nubapp-Origin": "user_apps",
}
self.session.headers.update(self.base_headers)
# Define mandatory parameters for API calls
self.mandatory_params: Dict[str, str] = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"id_application": APPLICATION_ID,
"id_category_activity": CATEGORY_ID
}
# Initialize the SessionNotifier with credentials from environment variables
email_credentials = {
"from": os.environ.get("EMAIL_FROM"),
"to": os.environ.get("EMAIL_TO"),
"password": os.environ.get("EMAIL_PASSWORD")
}
telegram_credentials = {
"token": os.environ.get("TELEGRAM_TOKEN"),
"chat_id": os.environ.get("TELEGRAM_CHAT_ID")
}
# Get notification settings from environment variables
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
self.notifier = SessionNotifier(
email_credentials,
telegram_credentials,
enable_email=enable_email,
enable_telegram=enable_telegram
)
def login(self) -> bool:
"""
Authenticate and get the bearer token.
Returns:
bool: True if login is successful, False otherwise.
"""
try:
# First login endpoint
login_params: Dict[str, str] = {
"app_version": APP_VERSION,
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/users/checkUser.php",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=urlencode(login_params))
if not response.ok:
logging.error(f"First login step failed: {response.status_code} - {response.text} - Response: {response.text[:100]}")
return False
try:
login_data: Dict[str, Any] = response.json()
self.user_id = str(login_data["data"]["user"]["id_user"])
except KeyError as ke:
logging.error(f"Key error during login: {str(ke)} - Response: {response.text}")
return False
except ValueError as ve:
logging.error(f"Value error during login: {str(ve)} - Response: {response.text}")
return False
# Second login endpoint
response: requests.Response = self.session.post(
"https://sport.nubapp.com/api/v4/login",
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
data=urlencode({
"device_type": DEVICE_TYPE,
"username": USERNAME,
"password": PASSWORD
}))
if response.ok:
try:
login_data: Dict[str, Any] = response.json()
self.auth_token = login_data.get("token")
except KeyError as ke:
logging.error(f"Key error during login: {str(ke)} - Response: {response.text}")
return False
except ValueError as ve:
logging.error(f"Value error during login: {str(ve)} - Response: {response.text}")
return False
if self.auth_token and self.user_id:
logging.info("Successfully logged in")
return True
else:
logging.error(f"Login failed: {response.status_code} - {response.text} - Response: {response.text[:100]}")
return False
except requests.exceptions.JSONDecodeError:
logging.error("Failed to decode JSON response during login")
return False
except requests.exceptions.RequestException as e:
logging.error(f"Request error during login: {str(e)}")
return False
except Exception as e:
logging.error(f"Unexpected error during login: {str(e)}")
return False
def get_available_sessions(self, start_date: datetime, end_date: datetime) -> Optional[Dict[str, Any]]:
"""
Fetch available sessions from the API with comprehensive error handling.
Args:
start_date (datetime): Start date for fetching sessions.
end_date (datetime): End date for fetching sessions.
Returns:
Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise.
"""
if not self.auth_token or not self.user_id:
logging.error("Authentication required - missing token or user ID")
return None
url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php"
# Prepare request with mandatory parameters
request_data: Dict[str, str] = self.mandatory_params.copy()
request_data.update({
"id_user": self.user_id,
"start_timestamp": start_date.strftime("%d-%m-%Y"),
"end_timestamp": end_date.strftime("%d-%m-%Y")
})
# Add retry logic with exponential backoff and more informative error messages
for retry in range(RETRY_MAX):
try:
try:
response: requests.Response = self.session.post(
url,
headers=get_auth_headers(self.base_headers, self.auth_token),
data=urlencode(request_data),
timeout=10
)
except requests.exceptions.Timeout:
logging.error(f"Request timed out after 10 seconds for URL: {url}. Retry {retry+1}/{RETRY_MAX}")
return None
except requests.exceptions.ConnectionError as e:
logging.error(f"Connection error for URL: {url} - Error: {str(e)}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed for URL: {url} - Error: {str(e)}")
return None
break # Success, exit retry loop
except requests.exceptions.JSONDecodeError:
logging.error("Failed to decode JSON response")
return None
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise # Propagate error
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
# All retries exhausted
logging.error(f"Failed after {RETRY_MAX} attempts")
return None
# Handle response
if response.status_code == 200:
try:
json_response: Dict[str, Any] = response.json()
return json_response
except ValueError:
logging.error("Failed to decode JSON response")
return None
elif response.status_code == 400:
logging.error("400 Bad Request - likely missing or invalid parameters")
logging.error(f"Request Data: {request_data}")
logging.error(f"Response: {response.text[:100]}")
return None
elif response.status_code == 401:
logging.error("401 Unauthorized - token may be expired or invalid")
logging.error(f"Response: {response.text[:100]}")
return None
elif 500 <= response.status_code < 600:
logging.error(f"Server error {response.status_code} - Response: {response.text[:100]}")
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}")
else:
logging.error(f"Unexpected status code: {response.status_code}")
return None
def book_session(self, session_id: str) -> bool:
"""
Book a specific session with debug logging.
Args:
session_id (str): ID of the session to book.
Returns:
bool: True if booking is successful, False otherwise.
"""
return self._make_request(
url="https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php",
data=prepare_booking_data(self.mandatory_params, session_id, self.user_id),
success_msg=f"Successfully booked session {session_id}"
)
def _make_request(self, url: str, data: Dict[str, str], success_msg: str) -> bool:
"""
Handle API requests with retry logic and response processing.
Args:
url (str): URL for the API request.
data (Dict[str, str]): Data to send with the request.
success_msg (str): Message to log on successful request.
Returns:
bool: True if request is successful, False otherwise.
"""
for retry in range(RETRY_MAX):
try:
response: requests.Response = self.session.post(
url,
headers=get_auth_headers(self.base_headers, self.auth_token),
data=urlencode(data),
timeout=10
)
if response.status_code == 200:
json_response: Dict[str, Any] = response.json()
if json_response.get("success", False):
logging.info(success_msg)
return True
logging.error(f"API returned success:false: {json_response}")
return False
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return False
except requests.exceptions.JSONDecodeError:
logging.error("Failed to decode JSON response")
return False
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"Final retry failed: {str(e)}")
raise # Propagate error
wait_time: int = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
return False
async def run_booking_cycle(self, current_time: datetime) -> None:
"""
Run one cycle of checking and booking sessions.
Args:
current_time (datetime): Current time for comparison.
"""
# Calculate date range to check (current day, day + 1, and day + 2)
start_date: date = current_time.date()
end_date: date = start_date + timedelta(days=2) # Only go up to day + 2
# Get available sessions
sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date)
if not sessions_data or not sessions_data.get("success", False):
logging.error("No sessions available or error fetching sessions - Sessions Data: {sessions_data}")
return
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
# Categorize sessions
categorized_sessions = categorize_sessions(activities, current_time, PREFERRED_SESSIONS, TIMEZONE)
if not categorized_sessions["bookable"] and not categorized_sessions["upcoming"]:
logging.info("No matching sessions found to book")
return
# Notify about all found preferred sessions, regardless of bookability
for session in categorized_sessions["all_preferred"]:
session_details = format_session_details(session, TIMEZONE)
await self.notifier.notify_session_booking(session_details)
logging.info(f"Notified about found preferred session: {session_details}")
# Notify about upcoming sessions
for session in categorized_sessions["upcoming"]:
session_details = format_session_details(session, TIMEZONE)
await self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow
logging.info(f"Notified about upcoming session: {session_details}")
# Book sessions
for session in categorized_sessions["bookable"]:
session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S")
logging.info(f"Attempting to book Preferred session at {session_time} ({session['name_activity']})")
booking_success = self.book_session(session["id_activity_calendar"])
# Process booking result
result = process_booking_results(session, booking_success, TIMEZONE)
if result["success"]:
# Send notification after successful booking
await self.notifier.notify_session_booking(result["details"])
logging.info(f"Successfully booked Preferred session at {session_time}")
else:
logging.error(f"Failed to book Preferred session at {session_time} - Session: {session}")
# Send notification about the failed booking
await self.notifier.notify_impossible_booking(result["details"])
logging.info(f"Notified about impossible booking for Preferred session at {session_time}")
async def run(self) -> None:
"""
Main execution loop.
"""
# Set up timezone
tz: pytz.timezone = pytz.timezone(TIMEZONE)
# Parse TARGET_RESERVATION_TIME to get the target hour and minute
target_hour, target_minute = map(int, TARGET_RESERVATION_TIME.split(":"))
target_time = datetime.now(tz).replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
booking_window_end = target_time + timedelta(hours=1)
# Initial login
if not self.login():
logging.error("Authentication failed - exiting program")
return
try:
while True:
try:
current_time: datetime = datetime.now(tz)
logging.info(f"Current time: {current_time}")
# Only book sessions if current time is within the booking window
if target_time <= current_time <= booking_window_end:
# Run booking cycle to check for preferred sessions and book
await self.run_booking_cycle(current_time)
# Wait for a short time before next check
time.sleep(60)
else:
# Check again in 5 minutes if outside booking window
time.sleep(300)
except Exception as e:
logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}")
time.sleep(60) # Wait before retrying after error
except KeyboardInterrupt:
self.quit()
def quit(self) -> None:
"""
Clean up resources and exit the script.
"""
logging.info("Script interrupted by user. Quitting...")
# Add any cleanup code here
exit(0)

View File

@@ -1,7 +1,7 @@
[
{
"day_of_week": 2,
"start_time": "17:30",
"start_time": "18:30",
"session_name_contains": "CONDITIONING"
},
{

View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""
Script to demonstrate how to execute the book_session method from crossfit_booker.py
"""
import os
import sys
import logging
from crossfit_booker import CrossFitBooker
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def main():
# Check if a session ID was provided as an argument
if len(sys.argv) < 2:
print("Usage: python execute_book_session.py <session_id>")
sys.exit(1)
session_id = sys.argv[1]
# Create an instance of CrossFitBooker
booker = CrossFitBooker()
# Login to authenticate
print("Attempting to authenticate...")
if not booker.login():
print("Failed to authenticate. Please check your credentials and try again.")
sys.exit(1)
print("Authentication successful!")
# Book the session
print(f"Attempting to book session with ID: {session_id}")
success = booker.book_session(session_id)
if success:
print(f"Successfully booked session {session_id}")
else:
print(f"Failed to book session {session_id}")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"An error occurred: {e}")
sys.exit(1)

View File

@@ -0,0 +1,9 @@
#!/bin/bash
# Test script to demonstrate how to use the execute_book_session.py script
# Sample session ID (this should be a valid session ID from the crossfit booking system)
SESSION_ID="19291768"
# Run the script with the sample session ID
echo "Attempting to book session with ID: $SESSION_ID"
python execute_book_session.py $SESSION_ID