feat: More logging information in code

All methods functions are working properly. Now
includes logging and log file.
This commit is contained in:
kbe
2025-07-18 21:24:23 +02:00
parent e8cd2d3d96
commit 3a0af18b73
2 changed files with 116 additions and 112 deletions

View File

@@ -7,6 +7,7 @@ from datetime import datetime, timedelta
import os
import sys
import time
import difflib
# Third-party modules
import requests
@@ -16,18 +17,9 @@ from dotenv import load_dotenv
from urllib.parse import urlencode
from typing import List, Dict, Optional
from datetime import datetime, timedelta
# Parse session time (handles timezones if present)
from dateutil.parser import parse
import pytz
from urllib.parse import urlencode
from typing import List, Dict, Optional
from dotenv import load_dotenv
load_dotenv()
load_dotenv()
# Configuration
USERNAME = os.environ.get("CROSSFIT_USERNAME")
PASSWORD = os.environ.get("CROSSFIT_PASSWORD")
@@ -57,13 +49,15 @@ PREFERRED_SESSIONS = [
# Configure logging once at script startup
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG, # Change to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("log/crossfit_booking.log"),
logging.StreamHandler()
]
)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.info("Logging enhanced with request library noise reduction")
class CrossFitBooker:
@@ -110,11 +104,18 @@ class CrossFitBooker:
data=urlencode(login_params))
if not response.ok:
print(f"First login step failed: {response.status_code} - {response.text}")
logging.error(f"First login step failed: {response.status_code} - {response.text} - Response: {response.text[:100]}")
return False
login_data = response.json()
self.user_id = str(login_data["data"]["user"]["id_user"])
try:
login_data = response.json()
self.user_id = str(login_data["data"]["user"]["id_user"])
except KeyError as ke:
logging.error(f"Key error during login: {str(ke)} - Response: {response.text}")
return False
except ValueError as ve:
logging.error(f"Value error during login: {str(ve)} - Response: {response.text}")
return False
# Second login endpoint
response = self.session.post(
@@ -127,23 +128,37 @@ class CrossFitBooker:
}))
if response.ok:
login_data = response.json()
self.auth_token = login_data.get("token")
try:
login_data = response.json()
self.auth_token = login_data.get("token")
except KeyError as ke:
logging.error(f"Key error during login: {str(ke)} - Response: {response.text}")
return False
except ValueError as ve:
logging.error(f"Value error during login: {str(ve)} - Response: {response.text}")
return False
if self.auth_token and self.user_id:
print("Successfully logged in")
logging.info("Successfully logged in")
return True
print(f"Login failed: {response.status_code} - {response.text}")
return False
else:
logging.error(f"Login failed: {response.status_code} - {response.text} - Response: {response.text[:100]}")
return False
except requests.exceptions.JSONDecodeError:
logging.error("Failed to decode JSON response during login")
return False
except requests.exceptions.RequestException as e:
logging.error(f"Request error during login: {str(e)}")
return False
except Exception as e:
print(f"Login error: {str(e)}")
logging.error(f"Unexpected error during login: {str(e)}")
return False
def get_available_sessions(self, start_date: datetime, end_date: datetime) -> Optional[Dict]:
"""Fetch available sessions from the API with comprehensive error handling"""
if not self.auth_token or not self.user_id:
print("Authentication required - missing token or user ID")
logging.error("Authentication required - missing token or user ID")
return None
url = "https://sport.nubapp.com/api/v4/activities/getActivitiesCalendar.php"
@@ -156,58 +171,60 @@ class CrossFitBooker:
"end_timestamp": end_date.strftime("%d-%m-%Y")
})
# Debugging logs
# print(f"Request Data: {request_data}")
# print(f"Headers: {self.get_auth_headers()}")
# Add retry logic with exponential backoff
for retry in range(RETRY_MAX):
try:
response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(request_data),
timeout=10
)
try:
response = self.session.post(
url,
headers=self.get_auth_headers(),
data=urlencode(request_data),
timeout=10
)
except requests.exceptions.Timeout:
logging.error(f"Request timed out after 10 seconds for URL: {url}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed for URL: {url} - Error: {str(e)}")
return None
break # Success, exit retry loop
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ReadTimeout) as e:
except requests.exceptions.JSONDecodeError:
logging.error("Failed to decode JSON response")
return None
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
raise # Final retry failed, propagate error
logging.error(f"Final retry failed: {str(e)}")
raise # Propagate error
wait_time = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
# All retries exhausted
print(f"Failed after {RETRY_MAX} attempts")
logging.error(f"Failed after {RETRY_MAX} attempts")
return None
# Debug raw response
# print(f"Response Status Code: {response.status_code}")
# print(f"Response Content: {response.text}")
# Handle response
if response.status_code == 200:
try:
json_response = response.json()
return json_response
except ValueError:
print("Failed to decode JSON response")
logging.error("Failed to decode JSON response")
return None
elif response.status_code == 400:
print("400 Bad Request - likely missing or invalid parameters")
print("Verify these parameters:")
for param, value in request_data.items():
print(f"- {param}: {value}")
logging.error("400 Bad Request - likely missing or invalid parameters")
logging.error(f"Request Data: {request_data}")
logging.error(f"Response: {response.text[:100]}")
return None
elif response.status_code == 401:
print("401 Unauthorized - token may be expired or invalid")
logging.error("401 Unauthorized - token may be expired or invalid")
logging.error(f"Response: {response.text[:100]}")
return None
elif 500 <= response.status_code < 600:
logging.error(f"Server error {response.status_code} - Response: {response.text[:100]}")
raise requests.exceptions.ConnectionError(f"Server error {response.status_code}")
else:
print(f"Unexpected status code: {response.status_code}")
logging.error(f"Unexpected status code: {response.status_code}")
return None
def book_session(self, session_id: str) -> bool:
@@ -248,15 +265,16 @@ class CrossFitBooker:
logging.error(f"API returned success:false: {json_response}")
return False
logging.error(f"HTTP {response.status_code}: {response.text}")
logging.error(f"HTTP {response.status_code}: {response.text[:100]}")
return False
except (requests.exceptions.ConnectionError,
requests.exceptions.Timeout,
requests.exceptions.ReadTimeout) as e:
except requests.exceptions.JSONDecodeError:
logging.error("Failed to decode JSON response")
return False
except requests.exceptions.RequestException as e:
if retry == RETRY_MAX - 1:
logging.error(f"All {RETRY_MAX} retry attempts failed")
raise
logging.error(f"Final retry failed: {str(e)}")
raise # Propagate error
wait_time = RETRY_BACKOFF * (2 ** retry)
logging.warning(f"Request failed (attempt {retry+1}/{RETRY_MAX}): {str(e)}. Retrying in {wait_time}s...")
time.sleep(wait_time)
@@ -270,6 +288,7 @@ class CrossFitBooker:
# First check if can_join is true (primary condition)
if user_info.get("can_join", False):
logging.debug("Session is bookable: can_join is True")
return True
# If can_join is False, check if there's a booking window
@@ -285,6 +304,7 @@ class CrossFitBooker:
booking_datetime = pytz.timezone(TIMEZONE).localize(booking_datetime)
if current_time >= booking_datetime:
logging.debug(f"Session is bookable: current_time {current_time} >= booking_datetime {booking_datetime}")
return True # Booking window is open
else:
return False # Still waiting for booking to open
@@ -295,27 +315,40 @@ class CrossFitBooker:
return False
def matches_preferred_session(self, session: Dict, current_time: datetime) -> bool:
"""Check if session matches one of your preferred sessions."""
"""Check if session matches one of your preferred sessions with fuzzy matching."""
try:
session_time = parse(session["start_timestamp"])
if not session_time.tzinfo:
session_time = pytz.timezone(TIMEZONE).localize(session_time)
# Get day of week (0=Monday, 6=Sunday) and time
day_of_week = session_time.weekday()
session_time_str = session_time.strftime("%H:%M")
session_name = session.get("name_activity", "").upper()
# Check against preferred sessions
for preferred_day, preferred_time, preferred_name in PREFERRED_SESSIONS:
# Exact match first
if (day_of_week == preferred_day and
session_time_str == preferred_time and
preferred_name in session_name): # Partial match
preferred_name in session_name):
return True
# Fuzzy match fallback (80% similarity)
ratio = difflib.SequenceMatcher(
None,
session_name.lower(),
preferred_name.lower()
).ratio()
if (day_of_week == preferred_day and
abs(session_time.hour - int(preferred_time.split(':')[0])) <= 1 and
ratio >= 0.8):
logging.debug(f"Fuzzy match: {session_name}{preferred_name} ({ratio:.2%})")
return True
return False
except Exception as e:
logging.error(f"Failed to check session: {str(e)}")
logging.error(f"Failed to check session: {str(e)} - Session: {session}")
return False
def run_booking_cycle(self, current_time: datetime):
@@ -327,7 +360,7 @@ class CrossFitBooker:
# Get available sessions
sessions_data = self.get_available_sessions(start_date, end_date)
if not sessions_data or not sessions_data.get("success", False):
print("No sessions available or error fetching sessions")
logging.error("No sessions available or error fetching sessions - Sessions Data: {sessions_data}")
return
activities = sessions_data.get("data", {}).get("activities_calendar", [])
@@ -345,18 +378,18 @@ class CrossFitBooker:
sessions_to_book.append(("Available", session))
if not sessions_to_book:
print("No matching sessions found to book")
logging.info("No matching sessions found to book")
return
# Book sessions (preferred first)
sessions_to_book.sort(key=lambda x: 0 if x[0] == "Preferred" else 1)
for session_type, session in sessions_to_book:
session_time = datetime.strptime(session["start_datetime"], "%Y-%m-%d %H:%M:%S")
print(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
logging.info(f"Attempting to book {session_type} session at {session_time} ({session['name_activity']})")
if self.book_session(session["id_activity_calendar"]):
print(f"Successfully booked {session_type} session at {session_time}")
logging.info(f"Successfully booked {session_type} session at {session_time}")
else:
print(f"Failed to book {session_type} session at {session_time}")
logging.error(f"Failed to book {session_type} session at {session_time} - Session: {session}")
def run(self):
"""Main execution loop"""
@@ -369,58 +402,28 @@ class CrossFitBooker:
return
while True:
current_time = datetime.now(tz)
print(f"\nCurrent time: {current_time}")
try:
current_time = datetime.now(tz)
logging.info(f"Current time: {current_time}")
# Run booking cycle at the target time or if it's a test
if current_time.strftime("%H:%M") == TARGET_RESERVATION_TIME:
self.run_booking_cycle(current_time)
# Wait a minute to avoid checking again immediately
time.sleep(60)
else:
# Check again in 30 seconds
time.sleep(30)
except Exception as e:
logging.error(f"Unexpected error in booking cycle: {str(e)} - Traceback: {traceback.format_exc()}")
time.sleep(60) # Wait before retrying after error
# Run booking cycle at the target time or if it's a test
if current_time.strftime("%H:%M") == TARGET_RESERVATION_TIME:
self.run_booking_cycle(current_time)
# Wait a minute to avoid checking again immediately
time.sleep(60)
else:
# Check again in 30 seconds
time.sleep(30)
if __name__ == "__main__":
booker = CrossFitBooker()
if not booker.login():
print("Failed to login")
logging.error("Failed to login - Traceback: {traceback.format_exc()}")
exit(1)
# Set timezone for current_time
tz = pytz.timezone(TIMEZONE)
current_time = datetime.now(tz)
# Get sessions for the next 7 days
start_date = datetime.now()
end_date = start_date + timedelta(days=3)
session_data = booker.get_available_sessions(start_date, end_date)
if not session_data or not session_data.get("success", False):
logging.error("Failed to get session data")
exit(1)
activities = session_data.get("data", {}).get("activities_calendar", [])
bookable_sessions = []
for session in activities:
# Assuming the string is stored in a variable named session_time_str
session_time_str = "2025-07-19 20:01:08.858174+02:00"
session_time = datetime.strptime(session_time_str, "%Y-%m-%d %H:%M:%S.%f%z")
if booker.is_session_bookable(session, current_time):
# if booker.is_session_bookable(session, session_time):
bookable_sessions.append(session)
# print(f"Bookable sessions: {json.dumps(bookable_sessions, indent=2)}")
print(f"Found {len(bookable_sessions)} sessions to book")
for session in bookable_sessions:
is_prefered_session = booker.matches_preferred_session(session, current_time)
# print(session.get("name_activity") + " / " + str(is_prefered_session))
if is_prefered_session:
booker.book_session(session.get("id_activity_calendar"))
# Start continuous booking loop
booker.run()