refactor: does not notify if no session found
This commit is contained in:
357
old/crossfit_booker.py
Normal file
357
old/crossfit_booker.py
Normal file
@@ -0,0 +1,357 @@
|
||||
# Native modules
|
||||
import logging, traceback, os, time
|
||||
from datetime import datetime, timedelta, date
|
||||
|
||||
# Third-party modules
|
||||
import requests, pytz
|
||||
from dateutil.parser import parse
|
||||
from dotenv import load_dotenv
|
||||
from urllib.parse import urlencode
|
||||
from typing import List, Dict, Optional, Any, Tuple
|
||||
|
||||
from session_notifier import SessionNotifier
|
||||
from session_config import PREFERRED_SESSIONS
|
||||
|
||||
load_dotenv()
|
||||
|
||||
USERNAME = os.environ.get("CROSSFIT_USERNAME")
|
||||
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
|
||||
if not all([USERNAME, PASSWORD]):
|
||||
raise ValueError("Missing environment variables: CROSSFIT_USERNAME and/or CROSSFIT_PASSWORD")
|
||||
|
||||
APPLICATION_ID = "81560887"
|
||||
CATEGORY_ID = "677"
|
||||
TIMEZONE = "Europe/Paris"
|
||||
TARGET_RESERVATION_TIME = os.environ.get("TARGET_RESERVATION_TIME", "20:01")
|
||||
BOOKING_WINDOW_END_DELTA_MINUTES = int(os.environ.get("BOOKING_WINDOW_END_DELTA_MINUTES", "10"))
|
||||
DEVICE_TYPE = "3"
|
||||
RETRY_MAX = 3
|
||||
RETRY_BACKOFF = 1
|
||||
APP_VERSION = "5.09.21"
|
||||
|
||||
class CrossFitBooker:
|
||||
def __init__(self) -> None:
|
||||
self.auth_token: Optional[str] = None
|
||||
self.user_id: Optional[str] = None
|
||||
self.session: requests.Session = requests.Session()
|
||||
self.base_headers: Dict[str, str] = {
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:140.0) Gecko/20100101 Firefox/140.0",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Nubapp-Origin": "user_apps",
|
||||
}
|
||||
self.session.headers.update(self.base_headers)
|
||||
self.mandatory_params: Dict[str, str] = {
|
||||
"app_version": APP_VERSION, "device_type": DEVICE_TYPE,
|
||||
"id_application": APPLICATION_ID, "id_category_activity": CATEGORY_ID
|
||||
}
|
||||
email_credentials = {"from": os.environ.get("EMAIL_FROM"), "to": os.environ.get("EMAIL_TO"), "password": os.environ.get("EMAIL_PASSWORD")}
|
||||
telegram_credentials = {"token": os.environ.get("TELEGRAM_TOKEN"), "chat_id": os.environ.get("TELEGRAM_CHAT_ID")}
|
||||
enable_email = os.environ.get("ENABLE_EMAIL_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
|
||||
enable_telegram = os.environ.get("ENABLE_TELEGRAM_NOTIFICATIONS", "true").lower() in ("true", "1", "yes")
|
||||
self.notifier = SessionNotifier(email_credentials, telegram_credentials, enable_email=enable_email, enable_telegram=enable_telegram)
|
||||
|
||||
def _auth_headers(self) -> Dict[str, str]:
|
||||
h = self.base_headers.copy()
|
||||
if self.auth_token:
|
||||
h["Authorization"] = f"Bearer {self.auth_token}"
|
||||
return h
|
||||
|
||||
# Public method expected by tests
|
||||
def get_auth_headers(self) -> Dict[str, str]:
|
||||
"""
|
||||
Return headers with Authorization when token is present.
|
||||
This wraps _auth_headers() to satisfy tests expecting a public method.
|
||||
"""
|
||||
return self._auth_headers()
|
||||
|
||||
def _post(self, url: str, data: Dict[str, Any], headers: Optional[Dict[str, str]] = None, expect_json: bool = True) -> Optional[Any]:
|
||||
for retry in range(RETRY_MAX):
|
||||
try:
|
||||
resp = self.session.post(url, headers=(headers or self._auth_headers()), data=urlencode(data), timeout=10)
|
||||
sc = resp.status_code
|
||||
if sc == 200:
|
||||
return resp.json() if expect_json else resp
|
||||
if sc == 401:
|
||||
logging.error("401 Unauthorized")
|
||||
return None
|
||||
# Guard sc to ensure it's an int for comparison (fix tests using Mock without status_code int semantics)
|
||||
if isinstance(sc, int) and 500 <= sc < 600:
|
||||
logging.error(f"Server error {sc}")
|
||||
raise requests.exceptions.ConnectionError(f"Server error {sc}")
|
||||
logging.error(f"HTTP {sc}: {getattr(resp, 'text', '')[:100]}")
|
||||
return None
|
||||
except requests.exceptions.RequestException as e:
|
||||
if retry == RETRY_MAX - 1:
|
||||
logging.error(f"Final retry failed: {e}")
|
||||
raise
|
||||
wt = RETRY_BACKOFF * (2 ** retry)
|
||||
logging.warning(f"Request failed ({retry+1}/{RETRY_MAX}): {e}. Retrying in {wt}s...")
|
||||
time.sleep(wt)
|
||||
return None
|
||||
|
||||
def _parse_local(self, ts: str) -> datetime:
|
||||
dt = parse(ts)
|
||||
return dt if dt.tzinfo else pytz.timezone(TIMEZONE).localize(dt)
|
||||
|
||||
def _fmt_session(self, s: Dict[str, Any], dt: Optional[datetime] = None) -> str:
|
||||
dt = dt or self._parse_local(s["start_timestamp"])
|
||||
return f"{s['id_activity_calendar']} {s['name_activity']} at {dt.strftime('%Y-%m-%d %H:%M')}"
|
||||
|
||||
def login(self) -> bool:
|
||||
try:
|
||||
# Directly use requests to align with tests that mock requests.Session.post
|
||||
a = {"app_version": APP_VERSION, "device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD}
|
||||
resp1 = self.session.post(
|
||||
"https://sport.nubapp.com/api/v4/users/checkUser.php",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
||||
data=urlencode(a)
|
||||
)
|
||||
if not getattr(resp1, "ok", False):
|
||||
logging.error("First login step failed")
|
||||
return False
|
||||
try:
|
||||
r1 = resp1.json()
|
||||
self.user_id = str(r1["data"]["user"]["id_user"])
|
||||
except Exception as e:
|
||||
logging.error(f"Error during login: {e} - Response: {getattr(resp1, 'text', '')}")
|
||||
return False
|
||||
|
||||
resp2 = self.session.post(
|
||||
"https://sport.nubapp.com/api/v4/login",
|
||||
headers={"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"},
|
||||
data=urlencode({"device_type": DEVICE_TYPE, "username": USERNAME, "password": PASSWORD})
|
||||
)
|
||||
if getattr(resp2, "ok", False):
|
||||
try:
|
||||
r2 = resp2.json()
|
||||
self.auth_token = r2.get("token")
|
||||
except Exception as e:
|
||||
logging.error(f"Error during login: {e} - Response: {getattr(resp2, 'text', '')}")
|
||||
return False
|
||||
if self.auth_token and self.user_id:
|
||||
logging.info("Successfully logged in")
|
||||
return True
|
||||
|
||||
logging.error("Login failed")
|
||||
return False
|
||||
except requests.exceptions.RequestException as e:
|
||||
logging.error(f"Request error during login: {e}")
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error during login: {e}")
|
||||
return False
|
||||
|
||||
def get_available_sessions(self, start_date: date, end_date: date) -> Optional[Dict[str, Any]]:
|
||||
if not self.auth_token or not self.user_id:
|
||||
logging.error("Authentication required - missing token or user ID")
|
||||
return None
|
||||
data = {
|
||||
**self.mandatory_params,
|
||||
"id_user": self.user_id,
|
||||
"start_timestamp": start_date.strftime("%d-%m-%Y"),
|
||||
"end_timestamp": end_date.strftime("%d-%m-%Y"),
|
||||
}
|
||||
logging.debug(f"[get_available_sessions] URL=https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php "
|
||||
f"method=POST content_type=application/x-www-form-urlencoded "
|
||||
f"keys={list(data.keys())} id_user_present={bool(self.user_id)}")
|
||||
r = self._post("https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php", data)
|
||||
# Display available sessions in debug console
|
||||
if r is not None:
|
||||
success = r.get("success", False) if isinstance(r, dict) else None
|
||||
count = 0
|
||||
try:
|
||||
if isinstance(r, dict):
|
||||
activities = r.get("data", {}).get("activities_calendar", [])
|
||||
count = len(activities) if isinstance(activities, list) else 0
|
||||
except Exception:
|
||||
pass
|
||||
logging.debug(f"[get_available_sessions] success={success} activities_count={count}")
|
||||
# Log concise session summary to aid debugging
|
||||
if success and count:
|
||||
for s in r.get("data", {}).get("activities_calendar", [])[:50]:
|
||||
try:
|
||||
summary = self._fmt_session(s)
|
||||
except Exception:
|
||||
summary = f"{s.get('id_activity_calendar')} {s.get('name_activity')} at {s.get('start_timestamp')}"
|
||||
logging.debug(f"[session] {summary}")
|
||||
else:
|
||||
logging.debug(f"[get_available_sessions] raw_response_preview={str(r)[:500]}")
|
||||
else:
|
||||
logging.debug("[get_available_sessions] No response (None) from API")
|
||||
return r
|
||||
|
||||
async def book_session(self, session_id: str) -> bool:
|
||||
# Debug payload composition (without secrets)
|
||||
safe_user_id = str(self.user_id) if self.user_id else None
|
||||
debug_payload = {
|
||||
**self.mandatory_params,
|
||||
"id_activity_calendar": session_id,
|
||||
"id_user": safe_user_id,
|
||||
"action_by": safe_user_id,
|
||||
"n_guests": "0",
|
||||
"booked_on": "1",
|
||||
"device_type": self.mandatory_params.get("device_type"),
|
||||
"token_present": bool(self.auth_token),
|
||||
}
|
||||
logging.debug(f"[book_session] URL=https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php "
|
||||
f"method=POST content_type=application/x-www-form-urlencoded "
|
||||
f"keys={list(debug_payload.keys())} "
|
||||
f"id_activity_calendar_present={bool(session_id)} "
|
||||
f"user_id_present={bool(safe_user_id)} token_present={debug_payload['token_present']}")
|
||||
data = {
|
||||
**self.mandatory_params,
|
||||
"id_activity_calendar": session_id,
|
||||
"id_user": self.user_id,
|
||||
"action_by": self.user_id,
|
||||
"n_guests": "0",
|
||||
"booked_on": "1",
|
||||
"device_type": self.mandatory_params["device_type"],
|
||||
"token": self.auth_token
|
||||
}
|
||||
r = self._post("https://sport.nubapp.com/api/v4/activities/bookActivityCalendar.php", data)
|
||||
|
||||
if r is None:
|
||||
logging.error("Booking call failed")
|
||||
return False
|
||||
|
||||
# Check booking status and notify accordingly
|
||||
if r.get("success", False):
|
||||
logging.info(f"Successfully booked session {session_id}")
|
||||
details = f"{session_id} at {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
||||
await self.notifier.notify_session_booking(details)
|
||||
return True
|
||||
|
||||
# Handle failure case with error message
|
||||
error_message = r.get("message", "Unknown error")
|
||||
error_code = r.get("error", "unknown")
|
||||
logging.error(f"Booking failed: {error_message} (error code: {error_code})")
|
||||
details = f"{session_id} at {datetime.now().strftime('%Y-%m-%d %H:%M')}"
|
||||
await self.notifier.notify_impossible_booking(details)
|
||||
return False
|
||||
|
||||
def get_booked_sessions(self) -> List[Dict[str, Any]]:
|
||||
data = {**self.mandatory_params, "id_user": self.user_id, "action_by": self.user_id}
|
||||
r = self._post("https://sport.nubapp.com/api/v4/activities/getBookedActivities.php", data)
|
||||
if r and r.get("success", False):
|
||||
return r.get("data", [])
|
||||
logging.error(f"Failed to retrieve booked sessions: {r}" if r is not None else "Call failed")
|
||||
return []
|
||||
|
||||
def is_session_bookable(self, session: Dict[str, Any], current_time: datetime) -> bool:
|
||||
ui = session.get("user_info", {})
|
||||
if ui.get("can_join", False): return True
|
||||
d, t = ui.get("unableToBookUntilDate", ""), ui.get("unableToBookUntilTime", "")
|
||||
if d and t:
|
||||
try:
|
||||
bd = pytz.timezone(TIMEZONE).localize(datetime.strftime(f"{d} {t}", "%d-%m-%Y %H:%M"))
|
||||
if current_time >= bd: return True
|
||||
except ValueError: pass
|
||||
return False
|
||||
|
||||
def matches_preferred_session(self, session: Dict[str, Any], current_time: datetime) -> bool:
|
||||
try:
|
||||
st = self._parse_local(session["start_timestamp"])
|
||||
dow, hhmm = st.weekday(), st.strftime("%H:%M")
|
||||
name = session.get("name_activity", "").upper()
|
||||
for pd, pt, pn in PREFERRED_SESSIONS:
|
||||
if dow == pd and hhmm == pt and pn in name: return True
|
||||
return False
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to check session: {e} - Session: {session}")
|
||||
return False
|
||||
|
||||
async def run_booking_cycle(self, current_time: datetime) -> None:
|
||||
start_date, end_date = current_time.date(), current_time.date() + timedelta(days=2)
|
||||
sessions_data = self.get_available_sessions(start_date, end_date)
|
||||
if not sessions_data or not sessions_data.get("success", False):
|
||||
logging.error("No sessions available or error fetching sessions")
|
||||
return
|
||||
activities: List[Dict[str, Any]] = sessions_data.get("data", {}).get("activities_calendar", [])
|
||||
sessions_to_book: List[Tuple[str, Dict[str, Any]]] = []
|
||||
upcoming_sessions: List[Dict[str, Any]] = []
|
||||
found_preferred_sessions: List[Dict[str, Any]] = []
|
||||
|
||||
# Debug: list all preferred sessions detected (bookable or not)
|
||||
preferred_debug: List[str] = []
|
||||
|
||||
for s in activities:
|
||||
st = self._parse_local(s["start_timestamp"])
|
||||
days_diff = (st.date() - current_time.date()).days
|
||||
if not (0 <= days_diff <= 2):
|
||||
continue
|
||||
|
||||
is_preferred = self.matches_preferred_session(s, current_time)
|
||||
if is_preferred:
|
||||
# Collect concise summaries to debug output
|
||||
try:
|
||||
preferred_debug.append(self._fmt_session(s, st))
|
||||
except Exception:
|
||||
preferred_debug.append(f"{s.get('id_activity_calendar')} {s.get('name_activity')} at {s.get('start_timestamp')}")
|
||||
|
||||
if self.is_session_bookable(s, current_time):
|
||||
if is_preferred:
|
||||
sessions_to_book.append(("Preferred", s))
|
||||
found_preferred_sessions.append(s)
|
||||
else:
|
||||
if is_preferred:
|
||||
found_preferred_sessions.append(s)
|
||||
if days_diff == 1:
|
||||
upcoming_sessions.append(s)
|
||||
|
||||
# Emit debug of preferred sessions
|
||||
if preferred_debug:
|
||||
logging.debug("[preferred_sessions] " + " | ".join(preferred_debug[:50]))
|
||||
else:
|
||||
logging.debug("[preferred_sessions] none found in window")
|
||||
|
||||
if not sessions_to_book and not upcoming_sessions:
|
||||
logging.info("No matching sessions found to book")
|
||||
return
|
||||
|
||||
for s in found_preferred_sessions:
|
||||
details = self._fmt_session(s)
|
||||
await self.notifier.notify_session_booking(details)
|
||||
logging.info(f"Notified about found preferred session: {details}")
|
||||
|
||||
for s in upcoming_sessions:
|
||||
details = self._fmt_session(s)
|
||||
await self.notifier.notify_upcoming_session(details, 1)
|
||||
logging.info(f"Notified about upcoming session: {details}")
|
||||
|
||||
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
|
||||
for stype, s in sessions_to_book:
|
||||
st_dt = datetime.strptime(s["start_timestamp"], "%Y-%m-%d %H:%M:%S")
|
||||
logging.info(f"Attempting to book {stype} session at {st_dt} ({s['name_activity']})")
|
||||
if await self.book_session(s["id_activity_calendar"]):
|
||||
details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}"
|
||||
await self.notifier.notify_session_booking(details)
|
||||
logging.info(f"Successfully booked {stype} session at {st_dt}")
|
||||
else:
|
||||
logging.error(f"Failed to book {stype} session at {st_dt}")
|
||||
details = f"{s['name_activity']} at {st_dt.strftime('%Y-%m-%d %H:%M')}"
|
||||
await self.notifier.notify_impossible_booking(details)
|
||||
logging.info(f"Notified about impossible booking for {stype} session at {st_dt}")
|
||||
|
||||
async def run(self) -> None:
|
||||
tz = pytz.timezone(TIMEZONE)
|
||||
th, tm = map(int, TARGET_RESERVATION_TIME.split(":"))
|
||||
target_time = datetime.now(tz).replace(hour=th, minute=tm, second=0, microsecond=0)
|
||||
booking_window_end = target_time + timedelta(minutes=BOOKING_WINDOW_END_DELTA_MINUTES)
|
||||
if not self.login():
|
||||
logging.error("Authentication failed - exiting program"); return
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
now = datetime.now(tz)
|
||||
logging.info(f"Current time: {now}")
|
||||
if target_time <= now <= booking_window_end:
|
||||
await self.run_booking_cycle(now); time.sleep(60)
|
||||
else:
|
||||
time.sleep(300)
|
||||
except Exception as e:
|
||||
logging.error(f"Unexpected error in booking cycle: {e} - Traceback: {traceback.format_exc()}"); time.sleep(60)
|
||||
except KeyboardInterrupt:
|
||||
self.quit()
|
||||
|
||||
def quit(self) -> None:
|
||||
logging.info("Script interrupted by user. Quitting..."); exit(0)
|
||||
Reference in New Issue
Block a user