|
|
|
@@ -1,63 +1,36 @@
|
|
|
|
# Native modules
|
|
|
|
# Native modules
|
|
|
|
import logging
|
|
|
|
import logging, traceback, os, time
|
|
|
|
import traceback
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
import difflib
|
|
|
|
|
|
|
|
from datetime import datetime, timedelta, date
|
|
|
|
from datetime import datetime, timedelta, date
|
|
|
|
|
|
|
|
|
|
|
|
# Third-party modules
|
|
|
|
# Third-party modules
|
|
|
|
import requests
|
|
|
|
import requests, pytz
|
|
|
|
from dateutil.parser import parse
|
|
|
|
from dateutil.parser import parse
|
|
|
|
import pytz
|
|
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
from urllib.parse import urlencode
|
|
|
|
from urllib.parse import urlencode
|
|
|
|
from typing import List, Dict, Optional, Any, Tuple
|
|
|
|
from typing import List, Dict, Optional, Any, Tuple
|
|
|
|
|
|
|
|
|
|
|
|
# Import the SessionNotifier class
|
|
|
|
|
|
|
|
from session_notifier import SessionNotifier
|
|
|
|
from session_notifier import SessionNotifier
|
|
|
|
|
|
|
|
|
|
|
|
# Import the preferred sessions from the session_config module
|
|
|
|
|
|
|
|
from session_config import PREFERRED_SESSIONS
|
|
|
|
from session_config import PREFERRED_SESSIONS
|
|
|
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
|
|
|
# Configuration
|
|
|
|
|
|
|
|
USERNAME = os.environ.get("CROSSFIT_USERNAME")
|
|
|
|
USERNAME = os.environ.get("CROSSFIT_USERNAME")
|
|
|
|
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
|
|
|
|
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
|
|
|
|
|
|
|
|
|
|
|
|
if not all([USERNAME, PASSWORD]):
|
|
|
|
if not all([USERNAME, PASSWORD]):
|
|
|
|
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
|
|
|
|
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
|
|
|
|
|
|
|
|
|
|
|
|
APPLICATION_ID = "81560887"
|
|
|
|
APPLICATION_ID = "81560887"
|
|
|
|
CATEGORY_ID = "677" # Activity category ID for CrossFit
|
|
|
|
CATEGORY_ID = "677"
|
|
|
|
TIMEZONE = "Europe/Paris" # Adjust to your timezone
|
|
|
|
TIMEZONE = "Europe/Paris"
|
|
|
|
# Booking window configuration (can be overridden by environment variables)
|
|
|
|
|
|
|
|
# TARGET_RESERVATION_TIME: string "HH:MM" local time when bookings open (default 20:01)
|
|
|
|
|
|
|
|
# BOOKING_WINDOW_END_DELTA_MINUTES: int minutes after target time to stop booking (default 10)
|
|
|
|
|
|
|
|
TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01")
|
|
|
|
TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01")
|
|
|
|
BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10"))
|
|
|
|
BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10"))
|
|
|
|
DEVICE_TYPE = "3"
|
|
|
|
DEVICE_TYPE = "3"
|
|
|
|
|
|
|
|
|
|
|
|
# Retry configuration
|
|
|
|
|
|
|
|
RETRY_MAX = 3
|
|
|
|
RETRY_MAX = 3
|
|
|
|
RETRY_BACKOFF = 1
|
|
|
|
RETRY_BACKOFF = 1
|
|
|
|
APP_VERSION = "5.09.21"
|
|
|
|
APP_VERSION = "5.09.21"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CrossFitBooker:
|
|
|
|
class CrossFitBooker:
|
|
|
|
"""
|
|
|
|
|
|
|
|
A class for automating the booking of CrossFit sessions.
|
|
|
|
|
|
|
|
This class handles authentication, session availability checking,
|
|
|
|
|
|
|
|
booking, and notifications for CrossFit sessions.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
def __init__(self) -> None:
|
|
|
|
"""
|
|
|
|
|
|
|
|
Initialize the CrossFitBooker with necessary attributes.
|
|
|
|
|
|
|
|
Sets up authentication tokens, session headers, mandatory parameters,
|
|
|
|
|
|
|
|
and initializes the SessionNotifier for sending notifications.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.auth_token: Optional[str] = None
|
|
|
|
self.auth_token: Optional[str] = None
|
|
|
|
self.user_id: Optional[str] = None
|
|
|
|
self.user_id: Optional[str] = None
|
|
|
|
self.session: requests.Session = requests.Session()
|
|
|
|
self.session: requests.Session = requests.Session()
|
|
|
|
@@ -67,176 +40,164 @@ class CrossFitBooker:
|
|
|
|
"Nubapp-Origin": "user_apps",
|
|
|
|
"Nubapp-Origin": "user_apps",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
self.session.headers.update(self.base_headers)
|
|
|
|
self.session.headers.update(self.base_headers)
|
|
|
|
|
|
|
|
|
|
|
|
# Define mandatory parameters for API calls
|
|
|
|
|
|
|
|
self.mandatory_params: Dict[str, str] = {
|
|
|
|
self.mandatory_params: Dict[str, str] = {
|
|
|
|
"app_version": APP_VERSION,
|
|
|
|
"app_version": APP_VERSION, "device_type": DEVICE_TYPE,
|
|
|
|
"device_type": DEVICE_TYPE,
|
|
|
|
"id_application": APPLICATION_ID, "id_category_activity": CATEGORY_ID
|
|
|
|
"id_application": APPLICATION_ID,
|
|
|
|
|
|
|
|
"id_category_activity": CATEGORY_ID
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
email_credentials = {"from": os.environ.get("EMAIL_FROM"), "to": os.environ.get("EMAIL_TO"), "password": os.environ.get("EMAIL_PASSWORD")}
|
|
|
|
# Initialize the SessionNotifier with credentials from environment variables
|
|
|
|
telegram_credentials = {"token": os.environ.get("TELEGRAM_TOKEN"), "chat_id": os.environ.get("TELEGRAM_CHAT_ID")}
|
|
|
|
email_credentials = {
|
|
|
|
|
|
|
|
"from": os.environ.get("EMAIL_FROM"),
|
|
|
|
|
|
|
|
"to": os.environ.get("EMAIL_TO"),
|
|
|
|
|
|
|
|
"password": os.environ.get("EMAIL_PASSWORD")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
telegram_credentials = {
|
|
|
|
|
|
|
|
"token": os.environ.get("TELEGRAM_TOKEN"),
|
|
|
|
|
|
|
|
"chat_id": os.environ.get("TELEGRAM_CHAT_ID")
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Get notification settings from environment variables
|
|
|
|
|
|
|
|
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
|
|
|
|
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
|
|
|
|
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
|
|
|
|
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
|
|
|
|
|
|
|
|
self.notifier = SessionNotifier(email_credentials, telegram_credentials, enable_email=enable_email, enable_telegram=enable_telegram)
|
|
|
|
|
|
|
|
|
|
|
|
self.notifier = SessionNotifier(
|
|
|
|
def _auth_headers(self) -> Dict[str, str]:
|
|
|
|
email_credentials,
|
|
|
|
h = self.base_headers.copy()
|
|
|
|
telegram_credentials,
|
|
|
|
if self.auth_token:
|
|
|
|
enable_email=enable_email,
|
|
|
|
h["Authorization"] = f"Bearer {self.auth_token}"
|
|
|
|
enable_telegram=enable_telegram
|
|
|
|
return h
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Public method expected by tests
|
|
|
|
def get_auth_headers(self) -> Dict[str, str]:
|
|
|
|
def get_auth_headers(self) -> Dict[str, str]:
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Return headers with authorization if available.
|
|
|
|
Return headers with Authorization when token is present.
|
|
|
|
Returns:
|
|
|
|
This wraps _auth_headers() to satisfy tests expecting a public method.
|
|
|
|
Dict[str, str]: Headers dictionary with authorization if available.
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
headers: Dict[str, str] = self.base_headers.copy()
|
|
|
|
return self._auth_headers()
|
|
|
|
if self.auth_token:
|
|
|
|
|
|
|
|
headers["Authorization"] = f"Bearer {self.auth_token}"
|
|
|
|
def _post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None, expect_json: bool = True) -> Optional[Any]:
|
|
|
|
return headers
|
|
|
|
for retry in range(RETRY_MAX):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
resp = self.session.post(url, headers=(headers or self._auth_headers()), data=urlencode(data), timeout=10)
|
|
|
|
|
|
|
|
sc = resp.status_code
|
|
|
|
|
|
|
|
if sc == 200:
|
|
|
|
|
|
|
|
return resp.json() if expect_json else resp
|
|
|
|
|
|
|
|
if sc == 401:
|
|
|
|
|
|
|
|
logging.error("401 Unauthorized")
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
# Guard sc to ensure it's an int for comparison (fix tests using Mock without status_code int semantics)
|
|
|
|
|
|
|
|
if isinstance(sc, int) and 500 <= sc < 600:
|
|
|
|
|
|
|
|
logging.error(f"Server error {sc}")
|
|
|
|
|
|
|
|
raise requests.exceptions.ConnectionError(f"Server error {sc}")
|
|
|
|
|
|
|
|
logging.error(f"HTTP {sc}: {getattr(resp, 'text', '')[:100]}")
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
|
|
if retry == RETRY_MAX - 1:
|
|
|
|
|
|
|
|
logging.error(f"Final retry failed: {e}")
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
wt = RETRY_BACKOFF * (2 ** retry)
|
|
|
|
|
|
|
|
logging.warning(f"Request failed ({retry+1}/{RETRY_MAX}): {e}. Retrying in {wt}s...")
|
|
|
|
|
|
|
|
time.sleep(wt)
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_local(self, ts: str) -> datetime:
|
|
|
|
|
|
|
|
dt = parse(ts)
|
|
|
|
|
|
|
|
return dt if dt.tzinfo else pytz.timezone(TIMEZONE).localize(dt)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fmt_session(self, s: Dict[str, Any], dt: Optional[datetime] = None) -> str:
|
|
|
|
|
|
|
|
dt = dt or self._parse_local(s["start_timestamp"])
|
|
|
|
|
|
|
|
return f"{s['id_activity_calendar']} {s['name_activity']} at {dt.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
|
|
|
|
|
|
|
|
def login(self) -> bool:
|
|
|
|
def login(self) -> bool:
|
|
|
|
"""
|
|
|
|
|
|
|
|
Authenticate and get the bearer token.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
bool: True if login is successful, False otherwise.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
# First login endpoint
|
|
|
|
# Directly use requests to align with tests that mock requests.Session.post
|
|
|
|
login_params: Dict[str, str] = {
|
|
|
|
a = {"app_version": APP_VERSION, "device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD}
|
|
|
|
"app_version": APP_VERSION,
|
|
|
|
resp1 = self.session.post(
|
|
|
|
"device_type": DEVICE_TYPE,
|
|
|
|
|
|
|
|
"username": USERNAME,
|
|
|
|
|
|
|
|
"password": PASSWORD
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
response: requests.Response = self.session.post(
|
|
|
|
|
|
|
|
"https://sport.nubapp.com/api/v4/users/checkUser.php",
|
|
|
|
"https://sport.nubapp.com/api/v4/users/checkUser.php",
|
|
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
|
|
data=urlencode(login_params))
|
|
|
|
data=urlencode(a)
|
|
|
|
|
|
|
|
)
|
|
|
|
if not response.ok:
|
|
|
|
if not getattr(resp1, "ok", False):
|
|
|
|
logging.error(f"First login step failed: {response.status_code} - {response.text[:100]}")
|
|
|
|
logging.error("First login step failed")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
login_data: Dict[str, Any] = response.json()
|
|
|
|
r1 = resp1.json()
|
|
|
|
self.user_id = str(login_data["data"]["user"]["id_user"])
|
|
|
|
self.user_id = str(r1["data"]["user"]["id_user"])
|
|
|
|
except (KeyError, ValueError) as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Error during login: {str(e)} - Response: {response.text}")
|
|
|
|
logging.error(f"Error during login: {e} - Response: {getattr(resp1, 'text', '')}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
# Second login endpoint
|
|
|
|
resp2 = self.session.post(
|
|
|
|
response: requests.Response = self.session.post(
|
|
|
|
|
|
|
|
"https://sport.nubapp.com/api/v4/login",
|
|
|
|
"https://sport.nubapp.com/api/v4/login",
|
|
|
|
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
|
|
|
|
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
|
|
|
|
data=urlencode({
|
|
|
|
data=urlencode({"device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD})
|
|
|
|
"device_type": DEVICE_TYPE,
|
|
|
|
)
|
|
|
|
"username": USERNAME,
|
|
|
|
if getattr(resp2, "ok", False):
|
|
|
|
"password": PASSWORD
|
|
|
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if response.ok:
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
login_data: Dict[str, Any] = response.json()
|
|
|
|
r2 = resp2.json()
|
|
|
|
self.auth_token = login_data.get("token")
|
|
|
|
self.auth_token = r2.get("token")
|
|
|
|
except (KeyError, ValueError) as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Error during login: {str(e)} - Response: {response.text}")
|
|
|
|
logging.error(f"Error during login: {e} - Response: {getattr(resp2, 'text', '')}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
if self.auth_token and self.user_id:
|
|
|
|
if self.auth_token and self.user_id:
|
|
|
|
logging.info("Successfully logged in")
|
|
|
|
logging.info("Successfully logged in")
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
|
|
|
|
logging.error(f"Login failed: {response.status_code} - {response.text[:100]}")
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.error("Login failed")
|
|
|
|
|
|
|
|
return False
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
logging.error(f"Request error during login: {str(e)}")
|
|
|
|
logging.error(f"Request error during login: {e}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Unexpected error during login: {str(e)}")
|
|
|
|
logging.error(f"Unexpected error during login: {e}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]:
|
|
|
|
def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]:
|
|
|
|
"""
|
|
|
|
|
|
|
|
Fetch available sessions from the API.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
|
|
start_date (date): Start date for fetching sessions.
|
|
|
|
|
|
|
|
end_date (date): End date for fetching sessions.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
Optional[Dict[str, Any]]: Dictionary containing available sessions if successful, None otherwise.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
if not self.auth_token or not self.user_id:
|
|
|
|
if not self.auth_token or not self.user_id:
|
|
|
|
logging.error("Authentication required - missing token or user ID")
|
|
|
|
logging.error("Authentication required - missing token or user ID")
|
|
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
|
|
data = {
|
|
|
|
url: str = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php"
|
|
|
|
**self.mandatory_params,
|
|
|
|
|
|
|
|
|
|
|
|
# Prepare request with mandatory parameters
|
|
|
|
|
|
|
|
request_data: Dict[str, str] = self.mandatory_params.copy()
|
|
|
|
|
|
|
|
request_data.update({
|
|
|
|
|
|
|
|
"id_user": self.user_id,
|
|
|
|
"id_user": self.user_id,
|
|
|
|
"start_timestamp": start_date.strftime("%d-%m-%Y"),
|
|
|
|
"start_timestamp": start_date.strftime("%d-%m-%Y"),
|
|
|
|
"end_timestamp": end_date.strftime("%d-%m-%Y")
|
|
|
|
"end_timestamp": end_date.strftime("%d-%m-%Y"),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
logging.debug(f"[get_available_sessions] URL=https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php "
|
|
|
|
# Add retry logic
|
|
|
|
f"method=POST content_type=application/x-www-form-urlencoded "
|
|
|
|
for retry in range(RETRY_MAX):
|
|
|
|
f"keys={list(data.keys())} id_user_present={bool(self.user_id)}")
|
|
|
|
|
|
|
|
r = self._post("https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php", data)
|
|
|
|
|
|
|
|
# Display available sessions in debug console
|
|
|
|
|
|
|
|
if r is not None:
|
|
|
|
|
|
|
|
success = r.get("success", False) if isinstance(r, dict) else None
|
|
|
|
|
|
|
|
count = 0
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
response: requests.Response = self.session.post(
|
|
|
|
if isinstance(r, dict):
|
|
|
|
url,
|
|
|
|
activities = r.get("data", {}).get("activities_calendar", [])
|
|
|
|
headers=self.get_auth_headers(),
|
|
|
|
count = len(activities) if isinstance(activities, list) else 0
|
|
|
|
data=urlencode(request_data),
|
|
|
|
except Exception:
|
|
|
|
timeout=10
|
|
|
|
pass
|
|
|
|
)
|
|
|
|
logging.debug(f"[get_available_sessions] success={success} activities_count={count}")
|
|
|
|
|
|
|
|
# Log concise session summary to aid debugging
|
|
|
|
if response.status_code == 200:
|
|
|
|
if success and count:
|
|
|
|
return response.json()
|
|
|
|
for s in r.get("data", {}).get("activities_calendar", [])[:50]:
|
|
|
|
elif response.status_code == 401:
|
|
|
|
try:
|
|
|
|
logging.error("401 Unauthorized - token may be expired or invalid")
|
|
|
|
summary = self._fmt_session(s)
|
|
|
|
return None
|
|
|
|
except Exception:
|
|
|
|
elif 500 <= response.status_code < 600:
|
|
|
|
summary = f"{s.get('id_activity_calendar')} {s.get('name_activity')} at {s.get('start_timestamp')}"
|
|
|
|
logging.error(f"Server error {response.status_code}")
|
|
|
|
logging.debug(f"[session] {summary}")
|
|
|
|
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}")
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
logging.debug(f"[get_available_sessions] raw_response_preview={str(r)[:500]}")
|
|
|
|
logging.error(f"Unexpected status code: {response.status_code} - {response.text[:100]}")
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
logging.debug("[get_available_sessions] No response (None) from API")
|
|
|
|
|
|
|
|
return r
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
|
|
if retry == RETRY_MAX - 1:
|
|
|
|
|
|
|
|
logging.error(f"Final retry failed: {str(e)}")
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
wait_time: int = RETRY_BACKOFF * (2 ** retry)
|
|
|
|
|
|
|
|
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
|
|
|
|
|
|
|
|
time.sleep(wait_time)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def book_session(self, session_id: str) -> bool:
|
|
|
|
def book_session(self, session_id: str) -> bool:
|
|
|
|
"""
|
|
|
|
# Debug payload composition (without secrets)
|
|
|
|
Book a specific session.
|
|
|
|
safe_user_id = str(self.user_id) if self.user_id else None
|
|
|
|
Args:
|
|
|
|
debug_payload = {
|
|
|
|
session_id (str): ID of the session to book.
|
|
|
|
**self.mandatory_params,
|
|
|
|
Returns:
|
|
|
|
"id_activity_calendar": session_id,
|
|
|
|
bool: True if booking is successful, False otherwise.
|
|
|
|
"id_user": safe_user_id,
|
|
|
|
"""
|
|
|
|
"action_by": safe_user_id,
|
|
|
|
url = "https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php"
|
|
|
|
"n_guests": "0",
|
|
|
|
|
|
|
|
"booked_on": "1",
|
|
|
|
|
|
|
|
"device_type": self.mandatory_params.get("device_type"),
|
|
|
|
|
|
|
|
"token_present": bool(self.auth_token),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
logging.debug(f"[book_session] URL=https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php "
|
|
|
|
|
|
|
|
f"method=POST content_type=application/x-www-form-urlencoded "
|
|
|
|
|
|
|
|
f"keys={list(debug_payload.keys())} "
|
|
|
|
|
|
|
|
f"id_activity_calendar_present={bool(session_id)} "
|
|
|
|
|
|
|
|
f"user_id_present={bool(safe_user_id)} token_present={debug_payload['token_present']}")
|
|
|
|
data = {
|
|
|
|
data = {
|
|
|
|
**self.mandatory_params,
|
|
|
|
**self.mandatory_params,
|
|
|
|
"id_activity_calendar": session_id,
|
|
|
|
"id_activity_calendar": session_id,
|
|
|
|
@@ -247,275 +208,136 @@ class CrossFitBooker:
|
|
|
|
"device_type": self.mandatory_params["device_type"],
|
|
|
|
"device_type": self.mandatory_params["device_type"],
|
|
|
|
"token": self.auth_token
|
|
|
|
"token": self.auth_token
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
r = self._post("https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php", data)
|
|
|
|
for retry in range(RETRY_MAX):
|
|
|
|
if r and r.get("success", False):
|
|
|
|
try:
|
|
|
|
logging.info(f"Successfully booked session {session_id}")
|
|
|
|
response: requests.Response = self.session.post(
|
|
|
|
return True
|
|
|
|
url,
|
|
|
|
logging.error(f"Booking failed: {r}" if r is not None else "Booking call failed")
|
|
|
|
headers=self.get_auth_headers(),
|
|
|
|
|
|
|
|
data=urlencode(data),
|
|
|
|
|
|
|
|
timeout=10
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
|
|
json_response: Dict[str, Any] = response.json()
|
|
|
|
|
|
|
|
if json_response.get("success", False):
|
|
|
|
|
|
|
|
logging.info(f"Successfully booked session {session_id}")
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
logging.error(f"API returned success:false: {json_response} - Session ID: {session_id}")
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
|
|
if retry == RETRY_MAX - 1:
|
|
|
|
|
|
|
|
logging.error(f"Final retry failed: {str(e)}")
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
wait_time: int = RETRY_BACKOFF * (2 ** retry)
|
|
|
|
|
|
|
|
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
|
|
|
|
|
|
|
|
time.sleep(wait_time)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
|
|
|
|
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def get_booked_sessions(self) -> List[Dict[str, Any]]:
|
|
|
|
def get_booked_sessions(self) -> List[Dict[str, Any]]:
|
|
|
|
"""
|
|
|
|
data = {**self.mandatory_params, "id_user": self.user_id, "action_by": self.user_id}
|
|
|
|
Get a list of booked sessions.
|
|
|
|
r = self._post("https://sport.nubapp.com/api/v4/activities/getBookedActivities.php", data)
|
|
|
|
|
|
|
|
if r and r.get("success", False):
|
|
|
|
Returns:
|
|
|
|
return r.get("data", [])
|
|
|
|
A list of dictionaries containing information about the booked sessions.
|
|
|
|
logging.error(f"Failed to retrieve booked sessions: {r}" if r is not None else "Call failed")
|
|
|
|
"""
|
|
|
|
|
|
|
|
url = "https://sport.nubapp.com/api/v4/activities/getBookedActivities.php"
|
|
|
|
|
|
|
|
data = {
|
|
|
|
|
|
|
|
**self.mandatory_params,
|
|
|
|
|
|
|
|
"id_user": self.user_id,
|
|
|
|
|
|
|
|
"action_by": self.user_id
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for retry in range(RETRY_MAX):
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
response: requests.Response = self.session.post(
|
|
|
|
|
|
|
|
url,
|
|
|
|
|
|
|
|
headers=self.get_auth_headers(),
|
|
|
|
|
|
|
|
data=urlencode(data),
|
|
|
|
|
|
|
|
timeout=10
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
|
|
|
json_response: Dict[str, Any] = response.json()
|
|
|
|
|
|
|
|
if json_response.get("success", False):
|
|
|
|
|
|
|
|
return json_response.get("data", [])
|
|
|
|
|
|
|
|
logging.error(f"API returned success:false: {json_response}")
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
|
|
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
|
|
|
|
if retry == RETRY_MAX - 1:
|
|
|
|
|
|
|
|
logging.error(f"Final retry failed: {str(e)}")
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
wait_time: int = RETRY_BACKOFF * (2 ** retry)
|
|
|
|
|
|
|
|
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
|
|
|
|
|
|
|
|
time.sleep(wait_time)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.error(f"Failed to complete request after {RETRY_MAX} attempts")
|
|
|
|
|
|
|
|
return []
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool:
|
|
|
|
def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool:
|
|
|
|
"""
|
|
|
|
ui = session.get("user_info", {})
|
|
|
|
Check if a session is bookable based on user_info.
|
|
|
|
if ui.get("can_join", False): return True
|
|
|
|
Args:
|
|
|
|
d, t = ui.get("unableToBookUntilDate", ""), ui.get("unableToBookUntilTime", "")
|
|
|
|
session (Dict[str, Any]): Session data.
|
|
|
|
if d and t:
|
|
|
|
current_time (datetime): Current time for comparison.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
bool: True if the session is bookable, False otherwise.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
user_info: Dict[str, Any] = session.get("user_info", {})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# First check if can_join is true (primary condition)
|
|
|
|
|
|
|
|
if user_info.get("can_join", False):
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# If can_join is False, check if there's a booking window
|
|
|
|
|
|
|
|
booking_date_str: str = user_info.get("unableToBookUntilDate", "")
|
|
|
|
|
|
|
|
booking_time_str: str = user_info.get("unableToBookUntilTime", "")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if booking_date_str and booking_time_str:
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
booking_datetime: datetime = datetime.strptime(
|
|
|
|
bd = pytz.timezone(TIMEZONE).localize(datetime.strptime(f"{d} {t}", "%d-%m-%Y %H:%M"))
|
|
|
|
f"{booking_date_str} {booking_time_str}",
|
|
|
|
if current_time >= bd: return True
|
|
|
|
"%d-%m-%Y %H:%M"
|
|
|
|
except ValueError: pass
|
|
|
|
)
|
|
|
|
|
|
|
|
booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if current_time >= booking_datetime:
|
|
|
|
|
|
|
|
return True # Booking window is open
|
|
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
|
|
pass # Ignore invalid date formats
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Default case: not bookable
|
|
|
|
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool:
|
|
|
|
def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool:
|
|
|
|
"""
|
|
|
|
|
|
|
|
Check if session matches one of your preferred sessions with exact matching.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
|
|
session (Dict[str, Any]): Session data.
|
|
|
|
|
|
|
|
current_time (datetime): Current time for comparison.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
|
|
bool: True if the session matches a preferred session, False otherwise.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
session_time: datetime = parse(session["start_timestamp"])
|
|
|
|
st = self._parse_local(session["start_timestamp"])
|
|
|
|
if not session_time.tzinfo:
|
|
|
|
dow, hhmm = st.weekday(), st.strftime("%H:%M")
|
|
|
|
session_time = pytz.timezone(TIMEZONE).localize(session_time)
|
|
|
|
name = session.get("name_activity", "").upper()
|
|
|
|
|
|
|
|
for pd, pt, pn in PREFERRED_SESSIONS:
|
|
|
|
day_of_week: int = session_time.weekday()
|
|
|
|
if dow == pd and hhmm == pt and pn in name: return True
|
|
|
|
session_time_str: str = session_time.strftime("%H:%M")
|
|
|
|
|
|
|
|
session_name: str = session.get("name_activity", "").upper()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS:
|
|
|
|
|
|
|
|
# Exact match
|
|
|
|
|
|
|
|
if (day_of_week == preferred_day and
|
|
|
|
|
|
|
|
session_time_str == preferred_time and
|
|
|
|
|
|
|
|
preferred_name in session_name):
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Failed to check session: {str(e)} - Session: {session}")
|
|
|
|
logging.error(f"Failed to check session: {e} - Session: {session}")
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
async def run_booking_cycle(self, current_time: datetime) -> None:
|
|
|
|
async def run_booking_cycle(self, current_time: datetime) -> None:
|
|
|
|
"""
|
|
|
|
start_date, end_date = current_time.date(), current_time.date() + timedelta(days=2)
|
|
|
|
Run one cycle of checking and booking sessions.
|
|
|
|
sessions_data = self.get_available_sessions(start_date, end_date)
|
|
|
|
Args:
|
|
|
|
|
|
|
|
current_time (datetime): Current time for comparison.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Calculate date range to check (current day, day + 1, and day + 2)
|
|
|
|
|
|
|
|
start_date: date = current_time.date()
|
|
|
|
|
|
|
|
end_date: date = start_date + timedelta(days=2) # Only go up to day + 2
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Get available sessions
|
|
|
|
|
|
|
|
sessions_data: Optional[Dict[str, Any]] = self.get_available_sessions(start_date, end_date)
|
|
|
|
|
|
|
|
if not sessions_data or not sessions_data.get("success", False):
|
|
|
|
if not sessions_data or not sessions_data.get("success", False):
|
|
|
|
logging.error("No sessions available or error fetching sessions")
|
|
|
|
logging.error("No sessions available or error fetching sessions")
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
|
|
|
|
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
|
|
|
|
|
|
|
|
|
|
|
|
# Find sessions to book (preferred only) within allowed date range
|
|
|
|
|
|
|
|
sessions_to_book: List[Tuple[str, Dict[str, Any]]] = []
|
|
|
|
sessions_to_book: List[Tuple[str, Dict[str, Any]]] = []
|
|
|
|
upcoming_sessions: List[Dict[str, Any]] = []
|
|
|
|
upcoming_sessions: List[Dict[str, Any]] = []
|
|
|
|
found_preferred_sessions: List[Dict[str, Any]] = []
|
|
|
|
found_preferred_sessions: List[Dict[str, Any]] = []
|
|
|
|
|
|
|
|
|
|
|
|
for session in activities:
|
|
|
|
# Debug: list all preferred sessions detected (bookable or not)
|
|
|
|
session_time: datetime = parse(session["start_timestamp"])
|
|
|
|
preferred_debug: List[str] = []
|
|
|
|
if not session_time.tzinfo:
|
|
|
|
|
|
|
|
session_time = pytz.timezone(TIMEZONE).localize(session_time)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Check if session is within allowed date range (current day, day + 1, or day + 2)
|
|
|
|
for s in activities:
|
|
|
|
days_diff = (session_time.date() - current_time.date()).days
|
|
|
|
st = self._parse_local(s["start_timestamp"])
|
|
|
|
|
|
|
|
days_diff = (st.date() - current_time.date()).days
|
|
|
|
if not (0 <= days_diff <= 2):
|
|
|
|
if not (0 <= days_diff <= 2):
|
|
|
|
continue # Skip sessions outside the allowed date range
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# Check if session is preferred and bookable
|
|
|
|
is_preferred = self.matches_preferred_session(s, current_time)
|
|
|
|
if self.is_session_bookable(session, current_time):
|
|
|
|
if is_preferred:
|
|
|
|
if self.matches_preferred_session(session, current_time):
|
|
|
|
# Collect concise summaries to debug output
|
|
|
|
sessions_to_book.append(("Preferred", session))
|
|
|
|
try:
|
|
|
|
found_preferred_sessions.append(session)
|
|
|
|
preferred_debug.append(self._fmt_session(s, st))
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
|
|
preferred_debug.append(f"{s.get('id_activity_calendar')} {s.get('name_activity')} at {s.get('start_timestamp')}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.is_session_bookable(s, current_time):
|
|
|
|
|
|
|
|
if is_preferred:
|
|
|
|
|
|
|
|
sessions_to_book.append(("Preferred", s))
|
|
|
|
|
|
|
|
found_preferred_sessions.append(s)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
# Check if it's a preferred session that's not bookable yet
|
|
|
|
if is_preferred:
|
|
|
|
if self.matches_preferred_session(session, current_time):
|
|
|
|
found_preferred_sessions.append(s)
|
|
|
|
found_preferred_sessions.append(session)
|
|
|
|
|
|
|
|
# Check if it's available tomorrow (day + 1)
|
|
|
|
|
|
|
|
if days_diff == 1:
|
|
|
|
if days_diff == 1:
|
|
|
|
upcoming_sessions.append(session)
|
|
|
|
upcoming_sessions.append(s)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Emit debug of preferred sessions
|
|
|
|
|
|
|
|
if preferred_debug:
|
|
|
|
|
|
|
|
logging.debug("[preferred_sessions] " + " | ".join(preferred_debug[:50]))
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
logging.debug("[preferred_sessions] none found in window")
|
|
|
|
|
|
|
|
|
|
|
|
if not sessions_to_book and not upcoming_sessions:
|
|
|
|
if not sessions_to_book and not upcoming_sessions:
|
|
|
|
logging.info("No matching sessions found to book")
|
|
|
|
logging.info("No matching sessions found to book")
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Notify about all found preferred sessions, regardless of bookability
|
|
|
|
for s in found_preferred_sessions:
|
|
|
|
for session in found_preferred_sessions:
|
|
|
|
details = self._fmt_session(s)
|
|
|
|
session_time: datetime = parse(session["start_timestamp"])
|
|
|
|
await self.notifier.notify_session_booking(details)
|
|
|
|
if not session_time.tzinfo:
|
|
|
|
logging.info(f"Notified about found preferred session: {details}")
|
|
|
|
session_time = pytz.timezone(TIMEZONE).localize(session_time)
|
|
|
|
|
|
|
|
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
|
|
|
|
await self.notifier.notify_session_booking(session_details)
|
|
|
|
|
|
|
|
logging.info(f"Notified about found preferred session: {session_details}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Notify about upcoming sessions
|
|
|
|
for s in upcoming_sessions:
|
|
|
|
for session in upcoming_sessions:
|
|
|
|
details = self._fmt_session(s)
|
|
|
|
session_time: datetime = parse(session["start_timestamp"])
|
|
|
|
await self.notifier.notify_upcoming_session(details, 1)
|
|
|
|
if not session_time.tzinfo:
|
|
|
|
logging.info(f"Notified about upcoming session: {details}")
|
|
|
|
session_time = pytz.timezone(TIMEZONE).localize(session_time)
|
|
|
|
|
|
|
|
session_details = f"{session['id_activity_calendar']} {session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
|
|
|
|
await self.notifier.notify_upcoming_session(session_details, 1) # Days until is 1 for tomorrow
|
|
|
|
|
|
|
|
logging.info(f"Notified about upcoming session: {session_details}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Book sessions (preferred first)
|
|
|
|
|
|
|
|
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
|
|
|
|
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
|
|
|
|
for session_type, session in sessions_to_book:
|
|
|
|
for stype, s in sessions_to_book:
|
|
|
|
session_time: datetime = datetime.strptime(session["start_timestamp"], "%Y-%m-%d %H:%M:%S")
|
|
|
|
st_dt = datetime.strptime(s["start_timestamp"], "%Y-%m-%d %H:%M:%S")
|
|
|
|
logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
|
|
|
|
logging.info(f"Attempting to book {stype} session at {st_dt} ({s['name_activity']})")
|
|
|
|
if self.book_session(session["id_activity_calendar"]):
|
|
|
|
if self.book_session(s["id_activity_calendar"]):
|
|
|
|
# Send notification after successful booking
|
|
|
|
details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
await self.notifier.notify_session_booking(details)
|
|
|
|
await self.notifier.notify_session_booking(session_details)
|
|
|
|
logging.info(f"Successfully booked {stype} session at {st_dt}")
|
|
|
|
logging.info(f"Successfully booked {session_type} session at {session_time}")
|
|
|
|
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
logging.error(f"Failed to book {session_type} session at {session_time}")
|
|
|
|
logging.error(f"Failed to book {stype} session at {st_dt}")
|
|
|
|
# Send notification about the failed booking
|
|
|
|
details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
session_details = f"{session['name_activity']} at {session_time.strftime('%Y-%m-%d %H:%M')}"
|
|
|
|
await self.notifier.notify_impossible_booking(details)
|
|
|
|
await self.notifier.notify_impossible_booking(session_details)
|
|
|
|
logging.info(f"Notified about impossible booking for {stype} session at {st_dt}")
|
|
|
|
logging.info(f"Notified about impossible booking for {session_type} session at {session_time}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run(self) -> None:
|
|
|
|
async def run(self) -> None:
|
|
|
|
"""
|
|
|
|
tz = pytz.timezone(TIMEZONE)
|
|
|
|
Main execution loop.
|
|
|
|
th, tm = map(int, TARGET_RESERVATION_TIME.split(":"))
|
|
|
|
"""
|
|
|
|
target_time = datetime.now(tz).replace(hour=th, minute=tm, second=0, microsecond=0)
|
|
|
|
# Set up timezone
|
|
|
|
|
|
|
|
tz: pytz.timezone = pytz.timezone(TIMEZONE)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Parse TARGET_RESERVATION_TIME to get the target hour and minute
|
|
|
|
|
|
|
|
target_hour, target_minute = map(int, TARGET_RESERVATION_TIME.split(":"))
|
|
|
|
|
|
|
|
target_time = datetime.now(tz).replace(hour=target_hour, minute=target_minute, second=0, microsecond=0)
|
|
|
|
|
|
|
|
booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES)
|
|
|
|
booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES)
|
|
|
|
|
|
|
|
|
|
|
|
# Initial login
|
|
|
|
|
|
|
|
if not self.login():
|
|
|
|
if not self.login():
|
|
|
|
logging.error("Authentication failed - exiting program")
|
|
|
|
logging.error("Authentication failed - exiting program"); return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
current_time: datetime = datetime.now(tz)
|
|
|
|
now = datetime.now(tz)
|
|
|
|
logging.info(f"Current time: {current_time}")
|
|
|
|
logging.info(f"Current time: {now}")
|
|
|
|
|
|
|
|
if target_time <= now <= booking_window_end:
|
|
|
|
# Only book sessions if current time is within the booking window
|
|
|
|
await self.run_booking_cycle(now); time.sleep(60)
|
|
|
|
if target_time <= current_time <= booking_window_end:
|
|
|
|
|
|
|
|
# Run booking cycle to check for preferred sessions and book
|
|
|
|
|
|
|
|
await self.run_booking_cycle(current_time)
|
|
|
|
|
|
|
|
# Wait for a short time before next check
|
|
|
|
|
|
|
|
time.sleep(60)
|
|
|
|
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
# Check again in 5 minutes if outside booking window
|
|
|
|
|
|
|
|
time.sleep(300)
|
|
|
|
time.sleep(300)
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
|
logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}")
|
|
|
|
logging.error(f"Unexpected error in booking cycle: {e} - Traceback: {traceback.format_exc()}"); time.sleep(60)
|
|
|
|
time.sleep(60) # Wait before retrying after error
|
|
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
self.quit()
|
|
|
|
self.quit()
|
|
|
|
|
|
|
|
|
|
|
|
def quit(self) -> None:
|
|
|
|
def quit(self) -> None:
|
|
|
|
"""
|
|
|
|
logging.info("Script interrupted by user. Quitting..."); exit(0)
|
|
|
|
Clean up resources and exit the script.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
logging.info("Script interrupted by user. Quitting...")
|
|
|
|
|
|
|
|
exit(0)
|
|
|
|
|
|
|
|
|