Add enhanced analysis with selective field analysis and category proposer

New Features:
- Selective field analysis: Choose which fields to analyze (title, meta_description, categories, site)
- In-place CSV updates: Update input CSV with new columns (automatic backup created)
- Category proposer: Dedicated command for AI-powered category suggestions

New Commands:
- seo analyze -f title categories: Analyze specific fields only
- seo analyze -u: Update input CSV with recommendations
- seo category_propose: Propose categories based on content

New Scripts:
- enhanced_analyzer.py: Enhanced AI analyzer with selective analysis
- category_proposer.py: Dedicated category proposal tool

CLI Options:
- --fields, -f: Specify fields to analyze
- --update, -u: Update input CSV (creates backup)
- --output, -o: Custom output file path

Output Columns:
- proposed_title, title_reason (for title analysis)
- proposed_meta_description, meta_reason (for meta analysis)
- proposed_category, category_reason (for category analysis)
- proposed_site, site_reason (for site analysis)
- ai_confidence, ai_priority (common to all)

Documentation:
- ENHANCED_ANALYSIS_GUIDE.md: Complete guide with examples

Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
This commit is contained in:
Kevin Bataille
2026-02-16 14:57:42 +01:00
parent 9d0a2c77eb
commit 1744d8e7db
4 changed files with 992 additions and 5 deletions

View File

@@ -0,0 +1,239 @@
#!/usr/bin/env python3
"""
Category Proposer - AI-powered category suggestions
Analyzes posts and proposes optimal categories based on content.
"""
import csv
import json
import logging
import sys
from pathlib import Path
from typing import Dict, List, Optional
import requests
from datetime import datetime
from config import Config
logger = logging.getLogger(__name__)
class CategoryProposer:
"""Propose categories for posts using AI."""
def __init__(self, csv_file: str):
"""Initialize proposer with CSV file."""
self.csv_file = Path(csv_file)
self.openrouter_api_key = Config.OPENROUTER_API_KEY
self.ai_model = Config.AI_MODEL
self.posts = []
self.proposed_categories = []
self.api_calls = 0
self.ai_cost = 0.0
def load_csv(self) -> bool:
"""Load posts from CSV."""
logger.info(f"Loading CSV: {self.csv_file}")
if not self.csv_file.exists():
logger.error(f"CSV file not found: {self.csv_file}")
return False
try:
with open(self.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.posts = list(reader)
logger.info(f"✓ Loaded {len(self.posts)} posts")
return True
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return False
def get_category_proposals(self, batch: List[Dict]) -> Optional[str]:
"""Get AI category proposals for a batch of posts."""
if not self.openrouter_api_key:
logger.error("OPENROUTER_API_KEY not set")
return None
# Format posts for AI
formatted = []
for i, post in enumerate(batch, 1):
text = f"{i}. ID: {post['post_id']}\n"
text += f" Title: {post.get('title', '')}\n"
text += f" Current Categories: {post.get('categories', '')}\n"
if 'content_preview' in post:
text += f" Content: {post['content_preview'][:300]}...\n"
formatted.append(text)
posts_text = "\n".join(formatted)
prompt = f"""Analyze these blog posts and propose optimal categories.
{posts_text}
For EACH post, provide:
{{
"post_id": <id>,
"current_categories": "<current>",
"proposed_category": "<best category>",
"alternative_categories": ["<alt1>", "<alt2>"],
"reason": "<brief explanation>",
"confidence": "<High|Medium|Low>"
}}
Return ONLY a JSON array with one object per post."""
try:
logger.info(f" Getting category proposals...")
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.openrouter_api_key}",
"Content-Type": "application/json",
},
json={
"model": self.ai_model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
},
timeout=60
)
response.raise_for_status()
result = response.json()
self.api_calls += 1
usage = result.get('usage', {})
input_tokens = usage.get('prompt_tokens', 0)
output_tokens = usage.get('completion_tokens', 0)
self.ai_cost += (input_tokens * 3 + output_tokens * 15) / 1_000_000
logger.info(f" ✓ Got proposals (tokens: {input_tokens}+{output_tokens})")
return result['choices'][0]['message']['content'].strip()
except Exception as e:
logger.error(f"Error getting proposals: {e}")
return None
def parse_proposals(self, proposals_json: str) -> List[Dict]:
"""Parse JSON proposals."""
try:
start_idx = proposals_json.find('[')
end_idx = proposals_json.rfind(']') + 1
if start_idx == -1 or end_idx == 0:
return []
return json.loads(proposals_json[start_idx:end_idx])
except json.JSONDecodeError:
return []
def propose_categories(self, batch_size: int = 10) -> bool:
"""Propose categories for all posts."""
logger.info("\n" + "="*70)
logger.info("PROPOSING CATEGORIES WITH AI")
logger.info("="*70 + "\n")
batches = [self.posts[i:i + batch_size] for i in range(0, len(self.posts), batch_size)]
logger.info(f"Processing {len(self.posts)} posts in {len(batches)} batches...\n")
all_proposals = {}
for batch_num, batch in enumerate(batches, 1):
logger.info(f"Batch {batch_num}/{len(batches)}...")
proposals_json = self.get_category_proposals(batch)
if not proposals_json:
continue
proposals = self.parse_proposals(proposals_json)
for prop in proposals:
all_proposals[str(prop.get('post_id', ''))] = prop
logger.info(f" ✓ Got {len(proposals)} proposals")
logger.info(f"\n✓ Proposals complete!")
logger.info(f" Total: {len(all_proposals)}")
logger.info(f" API calls: {self.api_calls}")
logger.info(f" Cost: ${self.ai_cost:.4f}")
# Map proposals to posts
for post in self.posts:
post_id = str(post['post_id'])
proposal = all_proposals.get(post_id, {})
self.proposed_categories.append({
**post,
'proposed_category': proposal.get('proposed_category', post.get('categories', '')),
'alternative_categories': ', '.join(proposal.get('alternative_categories', [])),
'category_reason': proposal.get('reason', ''),
'category_confidence': proposal.get('confidence', 'Medium'),
'current_categories': post.get('categories', '')
})
return True
def export_proposals(self, output_file: Optional[str] = None) -> str:
"""Export category proposals to CSV."""
if not output_file:
output_dir = Path(__file__).parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'category_proposals_{timestamp}.csv'
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
fieldnames = [
'post_id', 'title', 'site', 'current_categories',
'proposed_category', 'alternative_categories',
'category_reason', 'category_confidence'
]
logger.info(f"\nExporting to: {output_file}")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(self.proposed_categories)
logger.info(f"✓ Exported {len(self.proposed_categories)} proposals")
return str(output_file)
def run(self, output_file: Optional[str] = None, batch_size: int = 10) -> str:
"""Run complete category proposal process."""
if not self.load_csv():
sys.exit(1)
if not self.propose_categories(batch_size=batch_size):
logger.error("Failed to propose categories")
sys.exit(1)
return self.export_proposals(output_file)
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description='AI-powered category proposer for blog posts'
)
parser.add_argument('csv_file', help='Input CSV file with posts')
parser.add_argument('--output', '-o', help='Output CSV file')
parser.add_argument('--batch-size', type=int, default=10, help='Batch size')
args = parser.parse_args()
proposer = CategoryProposer(args.csv_file)
output_file = proposer.run(batch_size=args.batch_size)
logger.info(f"\n✓ Category proposals saved to: {output_file}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,375 @@
#!/usr/bin/env python3
"""
Enhanced AI Analyzer - Selective analysis with in-place updates
Analyzes posts and updates CSV with AI recommendations for:
- Title optimization
- Meta description optimization
- Category suggestions
- Site placement recommendations
"""
import csv
import json
import logging
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import requests
from datetime import datetime
from config import Config
logger = logging.getLogger(__name__)
class EnhancedPostAnalyzer:
"""Enhanced analyzer with selective column analysis and in-place updates."""
def __init__(self, csv_file: str, analyze_fields: Optional[List[str]] = None):
"""
Initialize analyzer.
Args:
csv_file: Path to input CSV
analyze_fields: List of fields to analyze ['title', 'meta_description', 'categories', 'site']
If None, analyzes all fields
"""
self.csv_file = Path(csv_file)
self.openrouter_api_key = Config.OPENROUTER_API_KEY
self.ai_model = Config.AI_MODEL
self.posts = []
self.analyzed_posts = []
self.api_calls = 0
self.ai_cost = 0.0
# Default: analyze all fields
if analyze_fields is None:
self.analyze_fields = ['title', 'meta_description', 'categories', 'site']
else:
self.analyze_fields = analyze_fields
logger.info(f"Fields to analyze: {', '.join(self.analyze_fields)}")
def load_csv(self) -> bool:
"""Load posts from CSV file."""
logger.info(f"Loading CSV: {self.csv_file}")
if not self.csv_file.exists():
logger.error(f"CSV file not found: {self.csv_file}")
return False
try:
with open(self.csv_file, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
self.posts = list(reader)
logger.info(f"✓ Loaded {len(self.posts)} posts from CSV")
return True
except Exception as e:
logger.error(f"Error loading CSV: {e}")
return False
def get_ai_recommendations(self, batch: List[Dict], fields: List[str]) -> Optional[str]:
"""Get AI recommendations for specific fields."""
if not self.openrouter_api_key:
logger.error("OPENROUTER_API_KEY not set")
return None
# Format posts for AI
formatted_posts = []
for i, post in enumerate(batch, 1):
post_text = f"{i}. POST ID: {post['post_id']}\n"
post_text += f" Site: {post.get('site', '')}\n"
if 'title' in fields:
post_text += f" Title: {post.get('title', '')}\n"
if 'meta_description' in fields:
post_text += f" Meta Description: {post.get('meta_description', '')}\n"
if 'categories' in fields:
post_text += f" Categories: {post.get('categories', '')}\n"
if 'content_preview' in post:
post_text += f" Content Preview: {post.get('content_preview', '')[:300]}...\n"
formatted_posts.append(post_text)
posts_text = "\n".join(formatted_posts)
# Build prompt based on requested fields
prompt_parts = ["Analyze these blog posts and provide recommendations.\n\n"]
if 'site' in fields:
prompt_parts.append("""Website Strategy:
- mistergeek.net: High-value topics (VPN, Software, Gaming, General Tech, SEO, Content Marketing)
- webscroll.fr: Torrenting, File-Sharing, Tracker guides
- hellogeek.net: Low-traffic, experimental, off-brand content
""")
prompt_parts.append(posts_text)
prompt_parts.append("\nFor EACH post, provide a JSON object with:\n{\n")
if 'title' in fields:
prompt_parts.append(' "proposed_title": "<Improved SEO title>",\n')
prompt_parts.append(' "title_reason": "<Reason for title change>",\n')
if 'meta_description' in fields:
prompt_parts.append(' "proposed_meta_description": "<Improved meta description (120-160 chars)>",\n')
prompt_parts.append(' "meta_reason": "<Reason for meta description change>",\n')
if 'categories' in fields:
prompt_parts.append(' "proposed_category": "<Best category>",\n')
prompt_parts.append(' "category_reason": "<Reason for category change>",\n')
if 'site' in fields:
prompt_parts.append(' "proposed_site": "<Best site for this post>",\n')
prompt_parts.append(' "site_reason": "<Reason for site recommendation>",\n')
prompt_parts.append(' "confidence": "<High|Medium|Low>",\n')
prompt_parts.append(' "priority": "<High|Medium|Low>"\n}')
prompt_parts.append("\nReturn ONLY a JSON array of objects, one per post.")
prompt = "".join(prompt_parts)
try:
logger.info(f" Sending batch to AI for analysis...")
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.openrouter_api_key}",
"Content-Type": "application/json",
},
json={
"model": self.ai_model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
},
timeout=60
)
response.raise_for_status()
result = response.json()
self.api_calls += 1
# Track cost
usage = result.get('usage', {})
input_tokens = usage.get('prompt_tokens', 0)
output_tokens = usage.get('completion_tokens', 0)
self.ai_cost += (input_tokens * 3 + output_tokens * 15) / 1_000_000
recommendations_text = result['choices'][0]['message']['content'].strip()
logger.info(f" ✓ Got recommendations (tokens: {input_tokens}+{output_tokens})")
return recommendations_text
except Exception as e:
logger.error(f"Error getting AI recommendations: {e}")
return None
def parse_recommendations(self, recommendations_json: str) -> List[Dict]:
"""Parse JSON recommendations from AI."""
try:
start_idx = recommendations_json.find('[')
end_idx = recommendations_json.rfind(']') + 1
if start_idx == -1 or end_idx == 0:
logger.error("Could not find JSON array in response")
return []
json_str = recommendations_json[start_idx:end_idx]
recommendations = json.loads(json_str)
return recommendations
except json.JSONDecodeError as e:
logger.error(f"Error parsing JSON recommendations: {e}")
return []
def analyze_posts(self, batch_size: int = 10) -> bool:
"""Analyze all posts in batches."""
logger.info("\n" + "="*70)
logger.info("ANALYZING POSTS WITH AI")
logger.info("="*70 + "\n")
batches = [self.posts[i:i + batch_size] for i in range(0, len(self.posts), batch_size)]
logger.info(f"Processing {len(self.posts)} posts in {len(batches)} batches...\n")
all_recommendations = {}
for batch_num, batch in enumerate(batches, 1):
logger.info(f"Batch {batch_num}/{len(batches)}: Analyzing {len(batch)} posts...")
recommendations_json = self.get_ai_recommendations(batch, self.analyze_fields)
if not recommendations_json:
logger.error(f" Failed to get recommendations for batch {batch_num}")
continue
recommendations = self.parse_recommendations(recommendations_json)
for rec in recommendations:
all_recommendations[str(rec.get('post_id', ''))] = rec
logger.info(f" ✓ Got {len(recommendations)} recommendations")
logger.info(f"\n✓ Analysis complete!")
logger.info(f" Total recommendations: {len(all_recommendations)}")
logger.info(f" API calls: {self.api_calls}")
logger.info(f" Estimated cost: ${self.ai_cost:.4f}")
# Map recommendations to posts
for post in self.posts:
post_id = str(post['post_id'])
if post_id in all_recommendations:
rec = all_recommendations[post_id]
# Add only requested fields
if 'title' in self.analyze_fields:
post['proposed_title'] = rec.get('proposed_title', post.get('title', ''))
post['title_reason'] = rec.get('title_reason', '')
if 'meta_description' in self.analyze_fields:
post['proposed_meta_description'] = rec.get('proposed_meta_description', post.get('meta_description', ''))
post['meta_reason'] = rec.get('meta_reason', '')
if 'categories' in self.analyze_fields:
post['proposed_category'] = rec.get('proposed_category', post.get('categories', ''))
post['category_reason'] = rec.get('category_reason', '')
if 'site' in self.analyze_fields:
post['proposed_site'] = rec.get('proposed_site', post.get('site', ''))
post['site_reason'] = rec.get('site_reason', '')
# Common fields
post['ai_confidence'] = rec.get('confidence', 'Medium')
post['ai_priority'] = rec.get('priority', 'Medium')
else:
# Add empty fields for consistency
if 'title' in self.analyze_fields:
post['proposed_title'] = post.get('title', '')
post['title_reason'] = 'No AI recommendation'
if 'meta_description' in self.analyze_fields:
post['proposed_meta_description'] = post.get('meta_description', '')
post['meta_reason'] = 'No AI recommendation'
if 'categories' in self.analyze_fields:
post['proposed_category'] = post.get('categories', '')
post['category_reason'] = 'No AI recommendation'
if 'site' in self.analyze_fields:
post['proposed_site'] = post.get('site', '')
post['site_reason'] = 'No AI recommendation'
post['ai_confidence'] = 'Unknown'
post['ai_priority'] = 'Medium'
self.analyzed_posts.append(post)
return len(self.analyzed_posts) > 0
def export_results(self, output_file: Optional[str] = None, update_input: bool = False) -> str:
"""
Export results to CSV.
Args:
output_file: Custom output path
update_input: If True, update the input CSV file (creates backup)
Returns:
Path to exported file
"""
if update_input:
# Create backup of original file
backup_file = self.csv_file.parent / f"{self.csv_file.stem}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
import shutil
shutil.copy2(self.csv_file, backup_file)
logger.info(f"✓ Created backup: {backup_file}")
output_file = self.csv_file
elif not output_file:
output_dir = Path(__file__).parent.parent / 'output'
output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = output_dir / f'analyzed_posts_{timestamp}.csv'
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
if not self.analyzed_posts:
logger.error("No analyzed posts to export")
return ""
# Build fieldnames - original fields + new fields
original_fields = list(self.analyzed_posts[0].keys())
# Determine which new fields were added
new_fields = []
if 'title' in self.analyze_fields:
new_fields.extend(['proposed_title', 'title_reason'])
if 'meta_description' in self.analyze_fields:
new_fields.extend(['proposed_meta_description', 'meta_reason'])
if 'categories' in self.analyze_fields:
new_fields.extend(['proposed_category', 'category_reason'])
if 'site' in self.analyze_fields:
new_fields.extend(['proposed_site', 'site_reason'])
new_fields.extend(['ai_confidence', 'ai_priority'])
fieldnames = original_fields + new_fields
logger.info(f"\nExporting results to: {output_file}")
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.analyzed_posts)
logger.info(f"✓ Exported {len(self.analyzed_posts)} posts")
return str(output_file)
def run(self, output_file: Optional[str] = None, update_input: bool = False, batch_size: int = 10) -> str:
"""Run complete analysis."""
if not self.load_csv():
sys.exit(1)
if not self.analyze_posts(batch_size=batch_size):
logger.error("Failed to analyze posts")
sys.exit(1)
return self.export_results(output_file=output_file, update_input=update_input)
def main():
"""Main entry point with argument parsing."""
import argparse
parser = argparse.ArgumentParser(
description='Enhanced AI analyzer with selective field analysis'
)
parser.add_argument('csv_file', help='Input CSV file')
parser.add_argument('--output', '-o', help='Output CSV file (default: creates new file in output/)')
parser.add_argument('--update', '-u', action='store_true', help='Update input CSV file (creates backup)')
parser.add_argument('--fields', '-f', nargs='+',
choices=['title', 'meta_description', 'categories', 'site'],
help='Fields to analyze (default: all fields)')
parser.add_argument('--batch-size', type=int, default=10, help='Batch size for AI analysis')
args = parser.parse_args()
analyzer = EnhancedPostAnalyzer(args.csv_file, analyze_fields=args.fields)
output_file = analyzer.run(
output_file=args.output,
update_input=args.update,
batch_size=args.batch_size
)
logger.info(f"\n✓ Analysis complete! Results saved to: {output_file}")
if __name__ == '__main__':
main()