""" Keyword opportunity analyzer for SEO optimization. Identifies high-potential keywords ranking at positions 11-30. """ import csv import json import argparse import time from pathlib import Path from openai import OpenAI from config import Config class OpportunityAnalyzer: """Analyze keyword opportunities for SEO optimization.""" def __init__(self): """Initialize analyzer.""" self.config = Config self.output_dir = self.config.OUTPUT_DIR self.logs = [] self.client = None if self.config.OPENROUTER_API_KEY: self.client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=self.config.OPENROUTER_API_KEY, ) def log(self, message): """Add message to log.""" self.logs.append(message) print(message) def load_posts(self, posts_csv): """Load posts with analytics data.""" posts = [] if not posts_csv.exists(): self.log(f"❌ File not found: {posts_csv}") return posts try: with open(posts_csv, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: try: posts.append({ 'id': row.get('ID', ''), 'title': row.get('Title', ''), 'url': row.get('URL', ''), 'impressions': int(row.get('impressions', 0) or 0), 'clicks': int(row.get('clicks', 0) or 0), 'avg_position': float(row.get('avg_position', 0) or 0), 'ctr': float(row.get('ctr', 0) or 0), 'traffic': int(row.get('traffic', 0) or 0), 'bounce_rate': float(row.get('bounce_rate', 0) or 0), 'keywords_count': int(row.get('keywords_count', 0) or 0), 'top_keywords': row.get('top_keywords', '') }) except (ValueError, TypeError): continue self.log(f"✓ Loaded {len(posts)} posts") except Exception as e: self.log(f"❌ Error reading posts: {e}") return posts def filter_opportunities(self, posts, min_pos, max_pos, min_impressions): """Filter posts with keywords in opportunity range or high traffic for optimization.""" opportunities = [] for post in posts: position = post.get('avg_position', 0) impressions = post.get('impressions', 0) traffic = post.get('traffic', 0) # Primary filter: position range (if data available) if position > 0: if min_pos <= position <= max_pos and impressions >= min_impressions: opportunities.append(post) # Fallback: filter by traffic when position data unavailable # Include posts with any traffic for optimization analysis elif traffic > 0: opportunities.append(post) self.log(f"✓ Found {len(opportunities)} posts for optimization analysis") if opportunities: traffic_posts = [p for p in opportunities if p.get('traffic', 0) > 0] self.log(f" ({len(traffic_posts)} have traffic data, {len(opportunities) - len(traffic_posts)} selected for analysis)") return opportunities def calculate_opportunity_score(self, post): """Calculate opportunity score (0-100) for a post.""" position = post.get('avg_position', 50) impressions = post.get('impressions', 0) ctr = post.get('ctr', 0) traffic = post.get('traffic', 0) # Position score (35%): Closer to page 1 = higher # Position 11-30 range position_score = max(0, (30 - position) / 19 * 35) # Traffic potential (30%): Based on impressions # Normalize to 0-30 traffic_potential = min(30, (impressions / 1000) * 30) # CTR improvement potential (20%): Gap between current and expected CTR # Expected CTR at position X expected_ctr_map = { 11: 0.02, 12: 0.02, 13: 0.015, 14: 0.015, 15: 0.013, 16: 0.012, 17: 0.011, 18: 0.01, 19: 0.009, 20: 0.008, 21: 0.008, 22: 0.007, 23: 0.007, 24: 0.006, 25: 0.006, 26: 0.006, 27: 0.005, 28: 0.005, 29: 0.005, 30: 0.004 } expected_ctr = expected_ctr_map.get(int(position), 0.005) ctr_gap = max(0, expected_ctr - ctr) ctr_score = min(20, (ctr_gap / expected_ctr * 100 / 5) * 20) # Content quality (15%): Existing traffic and engagement quality_score = min(15, (traffic / 100) * 7.5 + (100 - post.get('bounce_rate', 50)) / 100 * 7.5) return round(position_score + traffic_potential + ctr_score + quality_score, 1) def estimate_traffic_gain(self, post): """Estimate potential traffic gain from optimization.""" position = post.get('avg_position', 50) impressions = post.get('impressions', 0) ctr = post.get('ctr', 0) # Estimate CTR improvement from moving one position up # Moving from position X to X-1 typically improves CTR by 20-30% current_traffic = impressions * ctr if position > 11: # Target position: 1 ahead improvement_factor = 1.25 # 25% improvement per position estimated_new_traffic = current_traffic * improvement_factor gain = estimated_new_traffic - current_traffic else: gain = 0 return round(gain, 0) def generate_ai_recommendations(self, post): """Generate AI recommendations for top opportunities.""" if not self.client: return None try: keywords = post.get('top_keywords', '').split(',')[:5] keywords_str = ', '.join([k.strip() for k in keywords if k.strip()]) prompt = f"""Analyze keyword optimization opportunities for this blog post: Post Title: {post['title']} Current Position: {post['avg_position']:.1f} Monthly Impressions: {post['impressions']} Current CTR: {post['ctr']:.2%} Top Keywords: {keywords_str} Provide 2-3 specific, actionable recommendations to: 1. Improve the SEO title to increase CTR 2. Enhance the meta description 3. Target structural improvements (headers, content gaps) Focus on moving this post from positions 11-20 to page 1 (positions 1-10). Be specific and practical. Return as JSON: {{ "title_recommendations": ["recommendation 1", "recommendation 2"], "description_recommendations": ["recommendation 1", "recommendation 2"], "content_recommendations": ["recommendation 1", "recommendation 2"], "estimated_effort_hours": number, "expected_position_improvement": number }}""" response = self.client.chat.completions.create( model=self.config.AI_MODEL, messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=500 ) try: result_text = response.choices[0].message.content # Extract JSON start_idx = result_text.find('{') end_idx = result_text.rfind('}') + 1 if start_idx >= 0 and end_idx > start_idx: return json.loads(result_text[start_idx:end_idx]) except json.JSONDecodeError: self.log(f"⚠️ Could not parse AI response for {post['title']}") return None except Exception as e: self.log(f"⚠️ AI generation failed for {post['title']}: {e}") return None def export_opportunities_csv(self, opportunities, output_csv): """Export opportunities to CSV.""" if not opportunities: self.log("⚠️ No opportunities to export") return try: fieldnames = [ 'ID', 'Title', 'URL', 'avg_position', 'impressions', 'clicks', 'ctr', 'traffic', 'bounce_rate', 'keywords_count', 'top_keywords', 'opportunity_score', 'estimated_traffic_gain', 'title_recommendations', 'description_recommendations', 'content_recommendations', 'estimated_effort_hours', 'expected_position_improvement' ] with open(output_csv, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore') writer.writeheader() for opp in sorted(opportunities, key=lambda x: x['opportunity_score'], reverse=True): row = { 'ID': opp['id'], 'Title': opp['title'], 'URL': opp['url'], 'avg_position': opp['avg_position'], 'impressions': opp['impressions'], 'clicks': opp['clicks'], 'ctr': f"{opp['ctr']:.2%}", 'traffic': opp['traffic'], 'bounce_rate': opp['bounce_rate'], 'keywords_count': opp['keywords_count'], 'top_keywords': opp['top_keywords'], 'opportunity_score': opp['opportunity_score'], 'estimated_traffic_gain': opp['estimated_traffic_gain'], 'title_recommendations': opp.get('title_recommendations_str', ''), 'description_recommendations': opp.get('description_recommendations_str', ''), 'content_recommendations': opp.get('content_recommendations_str', ''), 'estimated_effort_hours': opp.get('estimated_effort_hours', ''), 'expected_position_improvement': opp.get('expected_position_improvement', '') } writer.writerow(row) self.log(f"✓ Exported {len(opportunities)} opportunities to {output_csv}") except Exception as e: self.log(f"❌ Error exporting CSV: {e}") def export_log(self, log_file): """Export analysis log.""" try: with open(log_file, 'w', encoding='utf-8') as f: f.write("SEO Opportunity Analysis Report\n") f.write("=" * 60 + "\n\n") for msg in self.logs: f.write(msg + "\n") self.log(f"✓ Exported log to {log_file}") except Exception as e: self.log(f"❌ Error exporting log: {e}") def run(self, posts_csv, output_csv, min_position=11, max_position=30, min_impressions=50, top_n=20): """Run complete analysis workflow.""" self.log("🔍 Starting keyword opportunity analysis...") self.log(f"Input: {posts_csv}") self.log(f"Position range: {min_position}-{max_position}") self.log(f"Min impressions: {min_impressions}") self.log(f"Top N for AI analysis: {top_n}\n") # Load posts posts = self.load_posts(posts_csv) if not posts: return # Filter opportunities opportunities = self.filter_opportunities(posts, min_position, max_position, min_impressions) if not opportunities: self.log("⚠️ No opportunities found in specified range") return # Calculate scores self.log("\n📊 Calculating opportunity scores...") for opp in opportunities: opp['opportunity_score'] = self.calculate_opportunity_score(opp) opp['estimated_traffic_gain'] = self.estimate_traffic_gain(opp) # Sort by score opportunities = sorted(opportunities, key=lambda x: x['opportunity_score'], reverse=True) # Get AI recommendations for top N self.log(f"\n🤖 Generating AI recommendations for top {min(top_n, len(opportunities))} opportunities...") for i, opp in enumerate(opportunities[:top_n]): self.log(f" [{i+1}/{min(top_n, len(opportunities))}] {opp['title'][:50]}...") recommendations = self.generate_ai_recommendations(opp) if recommendations: opp['title_recommendations_str'] = '; '.join(recommendations.get('title_recommendations', [])) opp['description_recommendations_str'] = '; '.join(recommendations.get('description_recommendations', [])) opp['content_recommendations_str'] = '; '.join(recommendations.get('content_recommendations', [])) opp['estimated_effort_hours'] = recommendations.get('estimated_effort_hours', '') opp['expected_position_improvement'] = recommendations.get('expected_position_improvement', '') time.sleep(0.2) # Rate limiting # Export self.log("\n📁 Exporting results...") self.export_opportunities_csv(opportunities, output_csv) # Export log log_dir = self.output_dir / 'logs' log_dir.mkdir(exist_ok=True) log_file = log_dir / 'opportunity_analysis_log.txt' self.export_log(log_file) self.log(f"\n✓ Analysis complete! {len(opportunities)} opportunities identified.") self.log(f" Top opportunity: {opportunities[0]['title'][:50]}... (score: {opportunities[0]['opportunity_score']})") def main(): """CLI entry point.""" parser = argparse.ArgumentParser(description='Analyze keyword opportunities') parser.add_argument('--input', type=Path, default=Path('output/results/posts_with_analytics.csv'), help='Input posts CSV') parser.add_argument('--output', type=Path, default=Path('output/results/keyword_opportunities.csv'), help='Output opportunities CSV') parser.add_argument('--min-position', type=int, default=11, help='Minimum position (start of range)') parser.add_argument('--max-position', type=int, default=30, help='Maximum position (end of range)') parser.add_argument('--min-impressions', type=int, default=50, help='Minimum impressions to consider') parser.add_argument('--top-n', type=int, default=20, help='Top N for AI recommendations') args = parser.parse_args() analyzer = OpportunityAnalyzer() analyzer.run(args.input, args.output, args.min_position, args.max_position, args.min_impressions, args.top_n) if __name__ == '__main__': main()