from django.core.management.base import BaseCommand, CommandError import os import csv import collections import re from datetime import datetime from django.conf import settings import json import tempfile import shutil class Command(BaseCommand): help = 'Analyze a padel rankings CSV file and provide statistics' def add_arguments(self, parser): parser.add_argument('file_path', nargs='?', type=str, help='Relative path to the rankings file from the static/rankings directory') parser.add_argument('--full-path', type=str, help='Full path to the rankings file (alternative to file_path)') parser.add_argument('--list-files', action='store_true', help='List available ranking files') parser.add_argument('--top', type=int, default=10, help='Number of top players to display') parser.add_argument('--clubs', type=int, default=10, help='Number of top clubs to display') parser.add_argument('--leagues', type=int, default=10, help='Number of top leagues to display') parser.add_argument('--find-anonymous', action='store_true', help='Find and match anonymous players with previous month rankings') parser.add_argument('--confidence', type=int, default=7, help='Confidence threshold for automatic matching (0-1)') parser.add_argument('--auto-match', action='store_true', help='Automatically match anonymous players when confidence is high') parser.add_argument('--output', type=str, help='Save results to output file') parser.add_argument('--verbose', action='store_true', help='Show detailed matching information') parser.add_argument('--named-only', action='store_true', help='Process only anonymous players WITH names (missing license)') parser.add_argument('--unnamed-only', action='store_true', help='Process only anonymous players WITHOUT names') def handle(self, *args, **options): # Base directory for rankings files rankings_dir = os.path.join(settings.BASE_DIR, 'tournaments', 'static', 'rankings') # Check if user wants to list available files if options['list_files']: self.list_available_files(rankings_dir) return # Get the file path if options['full_path']: file_path = options['full_path'] elif options['file_path']: file_path = os.path.join(rankings_dir, options['file_path']) else: self.stderr.write(self.style.ERROR('Please provide a file path or use --list-files to see available files')) return # Validate file exists if not os.path.exists(file_path): self.stderr.write(self.style.ERROR(f'File not found: {file_path}')) return # Process the file players, metadata = self.parse_rankings_file(file_path) # Generate statistics if players: # self.generate_statistics(players, options) # Find anonymous players if requested if options['find_anonymous']: if options['auto_match']: # Iterative approach: keep matching until no more changes can be made self.iterative_match_anonymous_players(file_path, rankings_dir, options) else: # Single pass analysis without making changes self.find_anonymous_players(players, metadata, rankings_dir, options, file_path) def list_available_files(self, rankings_dir): """List all available ranking files""" if not os.path.exists(rankings_dir): self.stderr.write(self.style.ERROR(f'Rankings directory not found: {rankings_dir}')) return files = [f for f in os.listdir(rankings_dir) if f.endswith('.csv')] files.sort() self.stdout.write(self.style.SUCCESS(f'Found {len(files)} ranking files:')) for f in files: self.stdout.write(f' - {f}') def parse_rankings_file(self, file_path): """Parse a rankings file and return player data and metadata""" try: self.stdout.write(f"Loading file: {file_path}...") # Read the file and parse data with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() self.stdout.write(f"File loaded. Found {len(lines)} lines, processing...") # Extract file metadata from first lines title = lines[0].strip().strip('"') period = lines[1].strip().strip('"') # Parse month and year from filename or content filename = os.path.basename(file_path) # Extract month-year from filename (format: CLASSEMENT-PADEL-MESSIEURS-MM-YYYY.csv) match = re.search(r'(\d{2})-(\d{4})', filename) if match: month = int(match.group(1)) year = int(match.group(2)) else: # Try to extract from period match = re.search(r'(\w+)\s+(\d{4})', period) if match: month_name = match.group(1) month_names = ["JANVIER", "FEVRIER", "MARS", "AVRIL", "MAI", "JUIN", "JUILLET", "AOUT", "SEPTEMBRE", "OCTOBRE", "NOVEMBRE", "DECEMBRE"] if month_name.upper() in month_names: month = month_names.index(month_name.upper()) + 1 else: month = datetime.now().month year = int(match.group(2)) else: # Default to current month = datetime.now().month year = datetime.now().year # Extract gender from filename gender = "UNKNOWN" if "MESSIEURS" in filename: gender = "MESSIEURS" elif "DAMES" in filename: gender = "DAMES" # Extract tranche/series from filename (e.g., MESSIEURS-2 or MESSIEURS-3) tranche = None tranche_match = re.search(r'MESSIEURS-(\d)', filename) if tranche_match: tranche = int(tranche_match.group(1)) metadata = { 'title': title, 'period': period, 'filename': filename, 'month': month, 'year': year, 'gender': gender, 'tranche': tranche } self.stdout.write(self.style.SUCCESS(f'Analyzing: {title} - {period}')) # Find the actual data start (after header rows) data_start = 0 for i, line in enumerate(lines): if ';RANG;NOM;PRENOM;' in line: data_start = i + 1 header = line.strip().split(';') break # Parse player data self.stdout.write(f"Parsing player data from line {data_start}...") players = [] line_count = 0 total_lines = len(lines[data_start:]) progress_interval = max(1, total_lines // 10) # Report progress at 10% intervals for line in lines[data_start:]: if not line.strip(): continue values = line.strip().split(';') if len(values) < 5: # Skip malformed lines continue # Create player record based on the Swift code line format # ";\(rank);\(lastName);\(firstName);\(country);\(strippedLicense);\(pointsString);\(assimilation); # \(tournamentCountString);\(ligue);\(formatNumbers(clubCode));\(club);\(progression.formattedAsRawString()); # \(bestRank?.formattedAsRawString() ?? "");\(birthYear?.formattedAsRawString() ?? "");" player = { 'rank': values[1].strip() if len(values) > 1 and values[1].strip() else 'N/A', 'name': values[2].strip() if len(values) > 2 and values[2].strip() else 'N/A', 'first_name': values[3].strip() if len(values) > 3 and values[3].strip() else 'N/A', 'nationality': values[4].strip() if len(values) > 4 and values[4].strip() else 'N/A', 'license': values[5].strip() if len(values) > 5 and values[5].strip() else 'N/A', 'points': values[6].strip() if len(values) > 6 and values[6].strip() else 'N/A', 'assimilation': values[7].strip() if len(values) > 7 and values[7].strip() else 'N/A', 'tournaments_played': values[8].strip() if len(values) > 8 and values[8].strip() else 'N/A', 'league': values[9].strip() if len(values) > 9 and values[9].strip() else 'N/A', 'club_code': values[10].strip() if len(values) > 10 and values[10].strip() else 'N/A', 'club': values[11].strip() if len(values) > 11 and values[11].strip() else 'N/A', 'progression': values[12].strip() if len(values) > 12 and values[12].strip() else '0', 'best_rank': values[13].strip() if len(values) > 13 and values[13].strip() else 'N/A', 'birth_year': values[14].strip() if len(values) > 14 and values[14].strip() else 'N/A', } players.append(player) # Show progress periodically line_count += 1 if line_count % progress_interval == 0: self.stdout.write(f" Progress: {line_count}/{total_lines} lines processed ({(line_count/total_lines)*100:.1f}%)") return players, metadata except Exception as e: self.stderr.write(self.style.ERROR(f'Error parsing file: {str(e)}')) return [], {} def generate_statistics(self, players, options): """Generate and display statistics about the ranking data""" total_players = len(players) self.stdout.write(f'Total players: {total_players}') # Top players self.stdout.write(self.style.SUCCESS(f'\nTop {options["top"]} players:')) for i, player in enumerate(players[:options["top"]]): progression = f" ({player['progression']})" if player['progression'] != 'N/A' else "" self.stdout.write(f'{i+1}. {player["rank"]} - {player["name"]} {player["first_name"]} - {player["points"]} points{progression}') # League distribution league_counter = collections.Counter([p["league"] for p in players if p["league"] != 'N/A']) self.stdout.write(self.style.SUCCESS(f'\nPlayers by league (top {options["leagues"]}):')) for league, count in league_counter.most_common(options["leagues"]): percentage = (count / total_players) * 100 self.stdout.write(f'{league}: {count} players ({percentage:.1f}%)') # Club distribution club_counter = collections.Counter([p["club"] for p in players if p["club"] != 'N/A']) self.stdout.write(self.style.SUCCESS(f'\nPlayers by club (top {options["clubs"]}):')) for club, count in club_counter.most_common(options["clubs"]): percentage = (count / total_players) * 100 self.stdout.write(f'{club}: {count} players ({percentage:.1f}%)') # Points statistics (if numeric) try: points = [float(p["points"]) for p in players if p["points"] not in ('N/A', '')] if points: self.stdout.write(self.style.SUCCESS('\nPoints statistics:')) self.stdout.write(f'Min: {min(points)}') self.stdout.write(f'Max: {max(points)}') self.stdout.write(f'Average: {sum(points) / len(points):.2f}') self.stdout.write(f'Players with points: {len(points)} ({(len(points) / total_players) * 100:.1f}%)') except ValueError: # Points might not be numeric pass # Anonymous players count anonymous_players = [p for p in players if self.is_anonymous_player(p)] if anonymous_players: self.stdout.write(self.style.SUCCESS(f'\nAnonymous players: {len(anonymous_players)} ({(len(anonymous_players) / total_players) * 100:.1f}%)')) def is_anonymous_player(self, player): """Check if a player is anonymous (missing name data or license)""" # Player is anonymous if they have no name data if (player['name'] == 'N/A' or player['name'] == '' or player['first_name'] == 'N/A' or player['first_name'] == ''): return True # Player is also anonymous if they have name but no license if (player['license'] == 'N/A' or player['license'] == ''): return True return False def player_exists_in_current_month(self, prev_player, current_players_indexes): """ Check if a player from the previous month already exists in the current month. Uses pre-built indexes for fast lookup. Args: prev_player: Player from previous month current_players_indexes: Dictionary of indexes for fast lookup Returns: (exists, matching_player) tuple """ # 1. Check by license number (fastest) if prev_player['license'] != 'N/A' and prev_player['license']: license_index = current_players_indexes.get('license_index', {}) if prev_player['license'] in license_index: return True, license_index[prev_player['license']] return False, None def build_current_players_indexes(self, current_players): """ Pre-process current players into lookup indexes for faster duplicate checking. Returns a dictionary of indexes. """ self.stdout.write("Building player indexes for fast lookup...") start_time = datetime.now() # Only index players that have BOTH name AND license players_to_index = [p for p in current_players if (p['license'] != 'N/A' and p['license'] != '') and (p['name'] != 'N/A' and p['name'] != '') and (p['first_name'] != 'N/A' and p['first_name'] != '')] # Create license index license_index = {} for player in players_to_index: if player['license'] != 'N/A' and player['license']: license_index[player['license']] = player # Create name index name_index = {} for player in players_to_index: if player['name'] != 'N/A' and player['first_name'] != 'N/A': name_key = f"{player['name'].lower()}_{player['first_name'].lower()}" name_index[name_key] = player # Create name+club/league index name_club_league_index = {} for player in players_to_index: if player['name'] != 'N/A': # Name + club if player['club'] != 'N/A': name_club_key = f"{player['name'].lower()}_{player['club'].lower()}" name_club_league_index[name_club_key] = player # Name + league if player['league'] != 'N/A': name_league_key = f"{player['name'].lower()}_{player['league'].lower()}" name_club_league_index[name_league_key] = player indexes = { 'license_index': license_index, 'name_index': name_index, 'name_club_league_index': name_club_league_index } elapsed = (datetime.now() - start_time).total_seconds() self.stdout.write(f"Indexes built in {elapsed:.2f} seconds. License keys: {len(license_index)}, Name keys: {len(name_index)}") return indexes def find_previous_month_file(self, current_metadata, rankings_dir): """Find the rankings file for the previous month""" current_month = current_metadata['month'] current_year = current_metadata['year'] gender = current_metadata['gender'] tranche = current_metadata['tranche'] # Calculate previous month and year prev_month = current_month - 1 prev_year = current_year if prev_month == 0: prev_month = 12 prev_year = current_year - 1 # Format for filename pattern tranche_part = f"-{tranche}" if tranche else "" pattern = f"CLASSEMENT-PADEL-{gender}{tranche_part}-{prev_month:02d}-{prev_year}.csv" # Look for exact match first exact_path = os.path.join(rankings_dir, pattern) if os.path.exists(exact_path): return exact_path # Otherwise, try more fuzzy matching pattern_base = f"CLASSEMENT-PADEL-{gender}{tranche_part}-{prev_month:02d}" for filename in os.listdir(rankings_dir): if filename.startswith(pattern_base) and filename.endswith(".csv"): return os.path.join(rankings_dir, filename) # If still not found, look for any file from previous month pattern_fallback = f"CLASSEMENT-PADEL-{gender}-{prev_month:02d}" for filename in os.listdir(rankings_dir): if filename.startswith(pattern_fallback) and filename.endswith(".csv"): return os.path.join(rankings_dir, filename) return None def find_anonymous_players(self, current_players, current_metadata, rankings_dir, options, file_path=None, return_count=False): """ Find anonymous players and try to match them with players from previous month. Args: current_players: List of current month players current_metadata: Metadata about current month file rankings_dir: Directory containing ranking files options: Command options file_path: Path to current month file (for auto-match) return_count: Whether to return the count of matched players Returns: Number of matched players if return_count is True, otherwise None """ start_time = datetime.now() # Initialize matched_count matched_count = 0 # Identify anonymous players all_anonymous_players = [p for p in current_players if self.is_anonymous_player(p)] if not all_anonymous_players: self.stdout.write(self.style.SUCCESS('No anonymous players found!')) if return_count: return 0 return # Check for conflicting options if options['named_only'] and options['unnamed_only']: self.stderr.write(self.style.ERROR('Cannot use both --named-only and --unnamed-only options together')) if return_count: return 0 return # Sort anonymous players by type anonymous_players_with_names = [] anonymous_players_without_names = [] for player in all_anonymous_players: if (player['name'] != 'N/A' and player['name'] != '' and player['first_name'] != 'N/A' and player['first_name'] != ''): anonymous_players_with_names.append(player) else: anonymous_players_without_names.append(player) # Select which players to process based on options if options['named_only']: anonymous_players = anonymous_players_with_names processing_type = "named anonymous players (with names but missing license)" elif options['unnamed_only']: anonymous_players = anonymous_players_without_names processing_type = "unnamed anonymous players (missing names)" else: # Default behavior: process named players first, then unnamed anonymous_players = anonymous_players_with_names + anonymous_players_without_names processing_type = "all anonymous players (named first, then unnamed)" if not anonymous_players: if options['named_only']: self.stdout.write(self.style.SUCCESS('No anonymous players with names found!')) elif options['unnamed_only']: self.stdout.write(self.style.SUCCESS('No anonymous players without names found!')) if return_count: return 0 return # Display summary self.stdout.write(self.style.SUCCESS(f'\nProcessing {processing_type}')) self.stdout.write(f'Anonymous players breakdown:') self.stdout.write(f' Total found: {len(all_anonymous_players)}') self.stdout.write(f' With names: {len(anonymous_players_with_names)}') self.stdout.write(f' Without names: {len(anonymous_players_without_names)}') self.stdout.write(f' Selected for processing: {len(anonymous_players)}') # Find previous month file prev_month_file = self.find_previous_month_file(current_metadata, rankings_dir) if not prev_month_file: self.stderr.write(self.style.ERROR('Previous month rankings file not found!')) if return_count: return 0 return self.stdout.write(f'Using previous month file: {os.path.basename(prev_month_file)}') # Load previous month data self.stdout.write('Loading previous month data...') prev_players, prev_metadata = self.parse_rankings_file(prev_month_file) if not prev_players: self.stderr.write(self.style.ERROR('Could not load previous month data!')) if return_count: return 0 return # Build fast lookup indexes for current players (major performance optimization) current_players_indexes = self.build_current_players_indexes(current_players) # Track potential matches matches_found = 0 high_confidence_matches = 0 skipped_existing_players = 0 results = [] # For each anonymous player, try to find matches self.stdout.write(f'Analyzing {len(anonymous_players)} anonymous players...') progress_counter = 0 progress_interval = max(1, len(anonymous_players) // 10) # Report progress at 10% intervals for anon_player in anonymous_players: # Show progress progress_counter += 1 if progress_counter % progress_interval == 0 or progress_counter == 1: # Determine which type of player we're processing if options['named_only']: player_type = "named" elif options['unnamed_only']: player_type = "unnamed" else: # Default behavior: check if we're still processing named players if progress_counter <= len(anonymous_players_with_names): player_type = "named" else: player_type = "unnamed" self.stdout.write(f' Processing {player_type} anonymous player {progress_counter}/{len(anonymous_players)} ({(progress_counter/len(anonymous_players))*100:.1f}%)') potential_matches = self.find_potential_matches(anon_player, prev_players, current_players_indexes, options) if potential_matches: matches_found += 1 best_match = potential_matches[0] # Highest confidence match # Record the match info match_info = { 'anonymous_player': anon_player, 'potential_matches': potential_matches, 'best_match': best_match } results.append(match_info) # Output match information progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else "" assimilation = f", Assimilation: {anon_player['assimilation']}" if anon_player['assimilation'] != 'N/A' else "" # Show if this is a named or unnamed anonymous player if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''): self.stdout.write(f"\nNamed anonymous player: {anon_player['name']} {anon_player['first_name']} - Rank {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation}") else: self.stdout.write(f"\nUnnamed anonymous player: Rank {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation}") for i, match in enumerate(potential_matches[:3]): # Show top 3 matches player = match['player'] confidence = match['confidence'] match_reasons = match['match_reasons'] self.stdout.write(f" Match {i+1}: {player['name']} {player['first_name']} (Rank: {player['rank']}, League: {player['league']})") self.stdout.write(f" Confidence: {confidence:.2f}, Match reasons: {match_reasons}") # Count high confidence matches if best_match['confidence'] >= options['confidence']: high_confidence_matches += 1 else: if options['verbose']: if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''): self.stdout.write(f"\nNo matches found for named anonymous player: {anon_player['name']} {anon_player['first_name']} - Rank {anon_player['rank']}, League: {anon_player['league']}") else: self.stdout.write(f"\nNo matches found for unnamed anonymous player: Rank {anon_player['rank']}, League: {anon_player['league']}") # Batch processing status update if progress_counter % 100 == 0 and progress_counter > 0: elapsed = (datetime.now() - start_time).total_seconds() per_player = elapsed / progress_counter remaining = (len(anonymous_players) - progress_counter) * per_player self.stdout.write(f" Processed {progress_counter}/{len(anonymous_players)} players in {elapsed:.1f}s") self.stdout.write(f" Estimated time remaining: {remaining:.1f}s ({per_player:.3f}s per player)") # Final timing total_elapsed = (datetime.now() - start_time).total_seconds() self.stdout.write(f"Analysis completed in {total_elapsed:.2f} seconds ({total_elapsed/len(anonymous_players):.3f}s per player)") # Summary self.stdout.write(self.style.SUCCESS(f'\nMatching summary:')) self.stdout.write(f'Processing mode: {processing_type}') self.stdout.write(f'Anonymous players processed: {len(anonymous_players)}') if not options['named_only'] and not options['unnamed_only']: self.stdout.write(f' Named: {len(anonymous_players_with_names)}') self.stdout.write(f' Unnamed: {len(anonymous_players_without_names)}') self.stdout.write(f'Players with potential matches: {matches_found}') self.stdout.write(f'High confidence matches (≥{options["confidence"]}): {high_confidence_matches}') self.stdout.write(f'Skipped players already in current month: {skipped_existing_players}') # Save results if requested if options['output']: self.stdout.write(f'Saving results to {options["output"]}...') self.save_results(results, options['output']) # Auto-match players if requested if options['auto_match'] and matches_found > 0 and file_path: # Note: We pass the selected anonymous_players for matching matched_count = self.update_rankings_with_matches(file_path, anonymous_players, results, options['confidence'], options) elif options['auto_match'] and file_path is None: self.stderr.write(self.style.ERROR("Auto-match was requested but file path is not available. No changes were made.")) # Return matched count if requested if return_count: return matched_count return None def find_potential_matches(self, anon_player, prev_players, current_players_indexes, options): """Find potential matches for an anonymous player from previous month data""" start_time = datetime.now() potential_matches = [] skipped_players = 0 # Show what we're matching if options['verbose']: progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else "" self.stdout.write(f" Finding matches for anonymous player: Rank {anon_player['rank']}{progression}, League: {anon_player['league']}") # Get ranking as integer if possible try: anon_rank = int(anon_player['rank']) if anon_player['rank'] != 'N/A' else None except ValueError: anon_rank = None # Parse progression to get previous rank if available prev_rank_from_progression = None prog_value = 0 # Default if no progression if anon_player['progression'] != 'N/A' and anon_player['progression']: try: # Progression can be like "+5", "-10", "=", etc. prog_str = anon_player['progression'].strip() if prog_str.startswith('+'): # CRITICAL FIX: If progression is positive (e.g., +96), player moved UP by 96 places # So previous rank is HIGHER (current rank + progression) prog_value = int(prog_str) elif prog_str.startswith('-'): # If progression is negative (e.g., -10), player moved DOWN by 10 places # So previous rank is LOWER (current rank + progression) prog_value = int(prog_str) elif prog_str == '=': prog_value = 0 # Handle pure numeric progression without sign elif prog_str.isdigit() or (prog_str.isdigit() and prog_str.startswith('-')): prog_value = int(prog_str) # Default to 0 for "NEW" or other special values except ValueError: prog_value = 0 # Calculate expected previous rank if anon_rank is not None: prev_rank_from_progression = anon_rank + prog_value # Add progression for previous rank if options['verbose']: self.stdout.write(f" Target previous rank: {prev_rank_from_progression} (current rank {anon_rank} + progression {prog_value})") # Show anonymous player details self.stdout.write("\n" + "="*80) self.stdout.write(f"Looking for matches for anonymous player at rank {anon_player['rank']}:") self.stdout.write(f" Points: {anon_player['points']}") self.stdout.write(f" Assimilation: {anon_player['assimilation']}") self.stdout.write(f" Tournaments: {anon_player['tournaments_played']}") self.stdout.write(f" League: {anon_player['league']}") if anon_player['name'] != 'N/A' and anon_player['first_name'] != 'N/A': self.stdout.write(f" Name: {anon_player['name']} {anon_player['first_name']}") self.stdout.write("-"*80) for prev_player in prev_players: # Skip anonymous players in previous month if self.is_anonymous_player(prev_player): continue # Initialize match data match_data = { 'player': prev_player, 'match_reasons': [], 'confidence': 0 } # Print candidate details self.stdout.write(f"\nChecking candidate: {prev_player['name']} {prev_player['first_name']}") self.stdout.write(f" Rank: {prev_player['rank']}") self.stdout.write(f" Points: {prev_player['points']}") self.stdout.write(f" Assimilation: {prev_player['assimilation']}") self.stdout.write(f" Tournaments: {prev_player['tournaments_played']}") self.stdout.write(f" League: {prev_player['league']}") # Start building confidence score confidence_details = [] # 1. PRIMARY MATCHER: Previous rank match if prev_rank_from_progression is not None: try: prev_rank_value = int(prev_player['rank']) if prev_rank_value is not None: rank_diff = abs(prev_rank_value - prev_rank_from_progression) match_data['rank_diff'] = rank_diff if rank_diff == 0: match_data['rank_match_type'] = 'exact' match_data['match_reasons'].append(f"exact previous rank match ({prev_rank_value})") match_data['confidence'] = 7 # Assimilation match if anon_player['assimilation'] == prev_player['assimilation']: match_data['confidence'] += 3 confidence_details.append(f"Assimilation match (+0.3)") match_data['match_reasons'].append(f"same assimilation ({anon_player['assimilation']})") # League match if (anon_player['league'] == prev_player['league'] and anon_player['league'] != 'N/A' and anon_player['league'] != ''): match_data['confidence'] += 7 confidence_details.append(f"League match (+0.5)") match_data['match_reasons'].append(f"same league ({anon_player['league']})") # Tournament count comparison try: anon_tournaments = int(anon_player['tournaments_played']) prev_tournaments = int(prev_player['tournaments_played']) tournaments_diff = abs(anon_tournaments - prev_tournaments) if tournaments_diff == 0: match_data['confidence'] += 4 confidence_details.append(f"Tournaments unchanged (+0.2)") match_data['match_reasons'].append(f"same tournaments played ({anon_tournaments})") else: # Calculate percentage difference max_tournaments = max(anon_tournaments, prev_tournaments) if max_tournaments > 0: percentage_diff = (tournaments_diff / max_tournaments) * 100 if percentage_diff <= 10: match_data['confidence'] += 3 confidence_details.append(f"Tournaments within 10% range (+0.15, diff: {percentage_diff:.1f}%)") match_data['match_reasons'].append(f"tournaments played: prev={prev_tournaments}, current={anon_tournaments}") elif percentage_diff <= 20: match_data['confidence'] += 2 confidence_details.append(f"Tournaments within 20% range (+0.1, diff: {percentage_diff:.1f}%)") match_data['match_reasons'].append(f"tournaments played: prev={prev_tournaments}, current={anon_tournaments}") else: confidence_details.append(f"Tournaments too different (diff: {percentage_diff:.1f}%)") match_data['match_reasons'].append(f"tournaments played: prev={prev_tournaments}, current={anon_tournaments}") else: # Handle edge case where both values are 0 match_data['confidence'] += 4 confidence_details.append(f"Both have 0 tournaments (+0.2)") match_data['match_reasons'].append(f"both have 0 tournaments played") except ValueError: confidence_details.append("Could not compare tournaments played") # Points comparison try: anon_points = float(anon_player['points']) prev_points = float(prev_player['points']) points_diff = abs(anon_points - prev_points) match_data['match_reasons'].append(f"points: prev={prev_points}, current={anon_points}, diff={points_diff}") if points_diff == 0: match_data['confidence'] += 4 confidence_details.append(f"Points unchanged (+0.3)") else: # Calculate percentage difference max_points = max(anon_points, prev_points) if max_points > 0: percentage_diff = (points_diff / max_points) * 100 if percentage_diff <= 10: match_data['confidence'] += 3 confidence_details.append(f"Points within 10% range (+0.25, diff: {percentage_diff:.1f}%)") elif percentage_diff <= 20: match_data['confidence'] += 2 confidence_details.append(f"Points within 20% range (+0.15, diff: {percentage_diff:.1f}%)") elif percentage_diff <= 30: match_data['confidence'] += 1 confidence_details.append(f"Points within 30% range (+0.1, diff: {percentage_diff:.1f}%)") else: confidence_details.append(f"Points too different (diff: {percentage_diff:.1f}%)") except ValueError: confidence_details.append("Could not compare points") elif rank_diff <= 3: match_data['rank_match_type'] = 'close' match_data['match_reasons'].append(f"close previous rank match ({prev_rank_value} vs {prev_rank_from_progression})") match_data['confidence'] = 4 elif rank_diff <= 10: match_data['rank_match_type'] = 'approximate' match_data['match_reasons'].append(f"approximate previous rank match ({prev_rank_value} vs {prev_rank_from_progression})") match_data['confidence'] = 2 except ValueError: pass # Name match check if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''): if (anon_player['name'].lower() == prev_player['name'].lower() and anon_player['first_name'].lower() == prev_player['first_name'].lower()): match_data['confidence'] += 25 confidence_details.append(f"Exact name match (+0.4)") match_data['match_reasons'].append("exact name match") # Birth year match if (anon_player['birth_year'] != 'N/A' and anon_player['birth_year'] != '' and prev_player['birth_year'] != 'N/A' and prev_player['birth_year'] != '' and anon_player['birth_year'] == prev_player['birth_year']): match_data['confidence'] += 1 confidence_details.append(f"Birth year match (+0.2)") match_data['match_reasons'].append(f"same birth year ({anon_player['birth_year']})") # Only consider matches with reasonable confidence if match_data['confidence'] >= 10: # Print confidence calculation details self.stdout.write("\n Confidence calculation:") for detail in confidence_details: self.stdout.write(f" {detail}") self.stdout.write(f" Total confidence: {match_data['confidence']:.2f}") match_data['match_reasons'] = ", ".join(match_data['match_reasons']) potential_matches.append(match_data) self.stdout.write(" → Considered as potential match") # else: # self.stdout.write(" → Rejected (confidence too low)") # self.stdout.write("-"*40) # Sort matches by confidence potential_matches.sort(key=lambda x: x['confidence'], reverse=True) # Summary of best matches if potential_matches: self.stdout.write("\nTop matches found:") for i, match in enumerate(potential_matches[:3]): # Show top 3 self.stdout.write(f"\n{i+1}. {match['player']['name']} {match['player']['first_name']}") self.stdout.write(f" Confidence: {match['confidence']:.2f}") self.stdout.write(f" Reasons: {match['match_reasons']}") else: self.stdout.write("\nNo matches found with sufficient confidence.") return potential_matches def save_results(self, results, output_path): """Save matching results to a file""" try: with open(output_path, 'w', encoding='utf-8') as f: f.write("Anonymous Player Matching Results\n") f.write("================================\n\n") for match_info in results: anon_player = match_info['anonymous_player'] best_match = match_info['best_match'] progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else "" assimilation = f", Assimilation: {anon_player['assimilation']}" if anon_player['assimilation'] != 'N/A' else "" f.write(f"Anonymous Player (Rank: {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation})\n") f.write(f"Best Match: {best_match['player']['name']} {best_match['player']['first_name']}\n") f.write(f" Confidence: {best_match['confidence']:.2f}\n") f.write(f" Match reasons: {best_match['match_reasons']}\n") f.write(f" Previous Rank: {best_match['player']['rank']}\n") f.write(f" League: {best_match['player']['league']}\n") f.write(f" Club: {best_match['player']['club']}\n\n") self.stdout.write(self.style.SUCCESS(f'Results saved to {output_path}')) except Exception as e: self.stderr.write(self.style.ERROR(f'Error saving results: {str(e)}')) def update_rankings_with_matches(self, file_path, anonymous_players, matches, confidence_threshold, options): """ Update the rankings file with matched player information Args: file_path: Path to the current month's rankings file anonymous_players: List of anonymous players (filtered based on command options) matches: List of match info dictionaries confidence_threshold: Minimum confidence to apply auto-matching options: Command options Returns: Number of players that were updated """ self.stdout.write(self.style.SUCCESS(f"\nAuto-matching players with confidence ≥ {confidence_threshold}...")) # Create a backup of the original file backup_path = f"{file_path}.bak" shutil.copy2(file_path, backup_path) self.stdout.write(f"Created backup of original file at: {backup_path}") # Read the original file with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() # Create a set of players that should be updated # Only include players that were in our filtered anonymous_players list AND have high confidence matches players_to_update = set() update_info = {} for match_info in matches: anon_player = match_info['anonymous_player'] best_match = match_info['best_match'] # Only update if this player was in our filtered list AND meets confidence threshold if anon_player in anonymous_players and best_match['confidence'] >= confidence_threshold: # Create a unique identifier for this player player_id = f"{anon_player['rank']}_{anon_player['points']}_{anon_player['assimilation']}_{anon_player['tournaments_played']}_{anon_player['league']}" # Add additional uniqueness based on name status if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''): player_id += f"_{anon_player['name']}_{anon_player['first_name']}" players_to_update.add(player_id) update_info[player_id] = { 'anonymous_player': anon_player, 'match': best_match } if not players_to_update: self.stdout.write("No players met the confidence threshold for auto-matching.") return 0 self.stdout.write(f"Found {len(players_to_update)} players to update.") # Process the file line by line updated_count = 0 updated_lines = [] # First, find the data start line data_start_line = 0 for i, line in enumerate(lines): if ';RANG;NOM;PRENOM;' in line: data_start_line = i + 1 break # Keep header lines unchanged updated_lines.extend(lines[:data_start_line]) # Process data lines for line in lines[data_start_line:]: if not line.strip(): updated_lines.append(line) continue # Parse the line values = line.strip().split(';') if len(values) < 3: updated_lines.append(line) continue # Extract player data from the line rank = values[1].strip() if len(values) > 1 else '' name = values[2].strip() if len(values) > 2 else '' first_name = values[3].strip() if len(values) > 3 else '' license_num = values[5].strip() if len(values) > 5 else '' points = values[6].strip() if len(values) > 6 else '' assimilation = values[7].strip() if len(values) > 7 else '' tournaments = values[8].strip() if len(values) > 8 else '' league = values[9].strip() if len(values) > 9 else '' # Create player identifier for this line line_player_id = f"{rank}_{points}_{assimilation}_{tournaments}_{league}" # Add name info if present if name and first_name and name != 'N/A' and first_name != 'N/A': line_player_id += f"_{name}_{first_name}" # Check if this player should be updated if line_player_id in players_to_update: # This player should be updated match_info = update_info[line_player_id] matched_player = match_info['match']['player'] # Update the line with matched player information # Keep the existing rank and points, but update name and license new_values = values.copy() new_values[2] = matched_player['name'] # Name new_values[3] = matched_player['first_name'] # First name new_values[4] = matched_player['nationality'] new_values[5] = matched_player['license'] new_values[10] = matched_player['club_code'] new_values[11] = matched_player['club'] new_values[14] = matched_player['birth_year'] new_line = ';'.join(new_values) + '\n' updated_lines.append(new_line) updated_count += 1 self.stdout.write(f"Updated player: {matched_player['name']} {matched_player['first_name']} (Rank: {rank})") else: # This player should NOT be updated - keep the line exactly as is updated_lines.append(line) # Write the updated file with open(file_path, 'w', encoding='utf-8') as f: f.writelines(updated_lines) self.stdout.write(self.style.SUCCESS(f"Successfully updated {updated_count} players in {file_path}")) return updated_count def iterative_match_anonymous_players(self, file_path, rankings_dir, options): """ Iteratively match anonymous players until no more matches can be found. Uses temporary files to optimize processing speed. """ iteration = 1 total_matched = 0 changes_made = True self.stdout.write(self.style.SUCCESS("\n=== Starting optimized iterative matching process ===")) # Load initial data current_players, current_metadata = self.parse_rankings_file(file_path) # Count anonymous players at the start anonymous_players = [p for p in current_players if self.is_anonymous_player(p)] initial_anonymous_count = len(anonymous_players) if initial_anonymous_count == 0: self.stdout.write(self.style.SUCCESS("No anonymous players found. Process complete!")) return self.stdout.write(f"Initial anonymous players: {initial_anonymous_count}") # Find previous month file prev_month_file = self.find_previous_month_file(current_metadata, rankings_dir) if not prev_month_file: self.stderr.write(self.style.ERROR('Previous month rankings file not found!')) return self.stdout.write(f'Using previous month file: {os.path.basename(prev_month_file)}') # Load previous month data prev_players, prev_metadata = self.parse_rankings_file(prev_month_file) # Create temp directory for our working files with tempfile.TemporaryDirectory() as temp_dir: self.stdout.write(f"Created temporary directory for working files: {temp_dir}") # Generate initial temp files anon_file = os.path.join(temp_dir, "anonymous_players.json") prev_players_file = os.path.join(temp_dir, "prev_month_players.json") matches_file = os.path.join(temp_dir, "matches.json") print(os.path.join(temp_dir)) # Extract anonymous players and filter previous month players self.stdout.write("Creating initial working files...") filtered_data = self.create_filtered_working_files( current_players, prev_players, anon_file, prev_players_file, options ) anon_count = filtered_data['anon_count'] prev_count = filtered_data['prev_count'] self.stdout.write(f"Extracted {anon_count} anonymous players and {prev_count} eligible previous month players") # Main iteration loop while changes_made and anon_count > 0: self.stdout.write(self.style.SUCCESS(f"\n--- Iteration {iteration} ---")) self.stdout.write(f"Anonymous players remaining: {anon_count}") self.stdout.write(f"Previous month players to check: {prev_count}") # Process the current state of temp files matched_count = self.match_players_from_temp_files( anon_file, prev_players_file, matches_file, file_path, current_metadata, options ) # Check if changes were made if matched_count > 0: total_matched += matched_count self.stdout.write(self.style.SUCCESS( f"Iteration {iteration} complete: Matched {matched_count} players" )) changes_made = True # Update current players from the main file current_players, _ = self.parse_rankings_file(file_path) # Remove matched players from prev_players for next iteration # Load the matches from the temp file to identify which prev players were used if os.path.exists(matches_file): with open(matches_file, 'r', encoding='utf-8') as f: matches = json.load(f) # Create a set of licenses that were matched matched_licenses = set() for match in matches: matched_player = match['best_match']['player'] if matched_player['license'] != 'N/A' and matched_player['license']: matched_licenses.add(matched_player['license']) # Remove matched players from prev_players prev_players = [p for p in prev_players if p['license'] not in matched_licenses] # Update temp files for next iteration filtered_data = self.create_filtered_working_files(current_players, prev_players, anon_file, prev_players_file, options) self.stdout.write(self.style.SUCCESS(f"Iteration {iteration} complete: No new matches found")) changes_made = False # Increment iteration counter iteration += 1 # Prevent infinite loops (optional safety check) if iteration > 1: # Cap at 10 iterations maximum self.stdout.write(self.style.WARNING("Maximum iterations reached (10). Stopping process.")) break # Final summary self.stdout.write(self.style.SUCCESS("\n=== Iterative matching process complete ===")) self.stdout.write(f"Total iterations: {iteration - 1}") self.stdout.write(f"Total players matched: {total_matched}") # Final statistics final_players, _ = self.parse_rankings_file(file_path) final_anonymous_count = len([p for p in final_players if self.is_anonymous_player(p)]) self.stdout.write(f"Anonymous players remaining: {final_anonymous_count}") # Calculate improvement percentage if initial_anonymous_count > 0: # Avoid division by zero improvement = ((initial_anonymous_count - final_anonymous_count) / initial_anonymous_count) * 100 self.stdout.write(f"Data completeness improved by {improvement:.1f}%") def create_filtered_working_files(self, current_players, prev_players, anon_file, prev_players_file, options): """ Create filtered working files: 1. anonymous_players.json - Contains only anonymous players from current month 2. prev_month_players.json - Contains only players from previous month not in current month Returns dictionary with counts of players in each file """ # Extract anonymous players from current month all_anonymous_players = [p for p in current_players if self.is_anonymous_player(p)] # Filter based on named/unnamed options if options['named_only']: anonymous_players = [p for p in all_anonymous_players if ( p['name'] != 'N/A' and p['name'] != '' and p['first_name'] != 'N/A' and p['first_name'] != '' )] self.stdout.write(self.style.SUCCESS(f"Filtering to only process named anonymous players ({len(anonymous_players)}/{len(all_anonymous_players)})")) elif options['unnamed_only']: anonymous_players = [p for p in all_anonymous_players if ( p['name'] == 'N/A' or p['name'] == '' or p['first_name'] == 'N/A' or p['first_name'] == '' )] self.stdout.write(self.style.SUCCESS(f"Filtering to only process unnamed anonymous players ({len(anonymous_players)}/{len(all_anonymous_players)})")) else: anonymous_players = all_anonymous_players # Create lookup for current non-anonymous players current_players_lookup = {} for player in current_players: if not self.is_anonymous_player(player): # License lookup if player['license'] != 'N/A' and player['license']: current_players_lookup[f"license_{player['license']}"] = True # Filter previous month players (only keep those not in current month) filtered_prev_players = [] for player in prev_players: if self.is_anonymous_player(player): continue # Skip anonymous players from previous month # Check if this player exists in current month exists_in_current = False # Check by license if player['license'] != 'N/A' and player['license']: if f"license_{player['license']}" in current_players_lookup: exists_in_current = True # Add to filtered list if not in current month if not exists_in_current: filtered_prev_players.append(player) # Write anonymous players to file with open(anon_file, 'w', encoding='utf-8') as f: json.dump(anonymous_players, f, ensure_ascii=False) # Write filtered previous players to file with open(prev_players_file, 'w', encoding='utf-8') as f: json.dump(filtered_prev_players, f, ensure_ascii=False) return { 'anon_count': len(anonymous_players), 'prev_count': len(filtered_prev_players) } def match_players_from_temp_files(self, anon_file, prev_players_file, matches_file, original_file, current_metadata, options): """ Match players between the anonymous and previous month temp files and update the original file with matches """ # Load anonymous players with open(anon_file, 'r', encoding='utf-8') as f: anonymous_players = json.load(f) # Load previous month players with open(prev_players_file, 'r', encoding='utf-8') as f: prev_players = json.load(f) if not anonymous_players or not prev_players: return 0 # Create indexes for efficient lookup current_players_indexes = { 'license_index': {}, 'name_index': {}, 'name_club_league_index': {} } # Find matches results = [] for anon_player in anonymous_players: potential_matches = self.find_potential_matches(anon_player, prev_players, current_players_indexes, options) if potential_matches: if len(potential_matches) == 1: best_match = potential_matches[0] # Highest confidence match elif len(potential_matches) > 1 and potential_matches[0]['confidence'] - potential_matches[1]['confidence'] > 2: # print(potential_matches[0]['confidence'], potential_matches[1]['match_reasons']) best_match = potential_matches[0] # Highest confidence match else: # for match in potential_matches: # print(match['player']['name'], match['confidence'], match['match_reasons']) continue # Record the match info match_info = { 'anonymous_player': anon_player, 'potential_matches': potential_matches, 'best_match': best_match } results.append(match_info) # Save matches to file with open(matches_file, 'w', encoding='utf-8') as f: # We can't directly serialize the complex match data, so extract key info serializable_results = [] for match_info in results: serializable_results.append({ 'anonymous_player': match_info['anonymous_player'], 'best_match': { 'player': match_info['best_match']['player'], 'confidence': match_info['best_match']['confidence'], 'match_reasons': match_info['best_match']['match_reasons'] } }) json.dump(serializable_results, f, ensure_ascii=False) # Apply matches to the original file if results: matched_count = self.update_rankings_with_matches( original_file, anonymous_players, results, options['confidence'], options ) return matched_count return 0