You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/tournaments/management/commands/analyze_rankings.py

1057 lines
48 KiB

from django.core.management.base import BaseCommand, CommandError
import os
import csv
import collections
import re
from datetime import datetime
from django.conf import settings
import json
import tempfile
import shutil
class Command(BaseCommand):
help = 'Analyze a padel rankings CSV file and provide statistics'
def add_arguments(self, parser):
parser.add_argument('file_path', nargs='?', type=str, help='Relative path to the rankings file from the static/rankings directory')
parser.add_argument('--full-path', type=str, help='Full path to the rankings file (alternative to file_path)')
parser.add_argument('--list-files', action='store_true', help='List available ranking files')
parser.add_argument('--top', type=int, default=10, help='Number of top players to display')
parser.add_argument('--clubs', type=int, default=10, help='Number of top clubs to display')
parser.add_argument('--leagues', type=int, default=10, help='Number of top leagues to display')
parser.add_argument('--find-anonymous', action='store_true', help='Find and match anonymous players with previous month rankings')
parser.add_argument('--confidence', type=float, default=0.7, help='Confidence threshold for automatic matching (0-1)')
parser.add_argument('--auto-match', action='store_true', help='Automatically match anonymous players when confidence is high')
parser.add_argument('--output', type=str, help='Save results to output file')
parser.add_argument('--verbose', action='store_true', help='Show detailed matching information')
def handle(self, *args, **options):
# Base directory for rankings files
rankings_dir = os.path.join(settings.BASE_DIR, 'tournaments', 'static', 'rankings')
# Check if user wants to list available files
if options['list_files']:
self.list_available_files(rankings_dir)
return
# Get the file path
if options['full_path']:
file_path = options['full_path']
elif options['file_path']:
file_path = os.path.join(rankings_dir, options['file_path'])
else:
self.stderr.write(self.style.ERROR('Please provide a file path or use --list-files to see available files'))
return
# Validate file exists
if not os.path.exists(file_path):
self.stderr.write(self.style.ERROR(f'File not found: {file_path}'))
return
# Process the file
players, metadata = self.parse_rankings_file(file_path)
# Generate statistics
if players:
self.generate_statistics(players, options)
# Find anonymous players if requested
if options['find_anonymous']:
if options['auto_match']:
# Iterative approach: keep matching until no more changes can be made
self.iterative_match_anonymous_players(file_path, rankings_dir, options)
else:
# Single pass analysis without making changes
self.find_anonymous_players(players, metadata, rankings_dir, options, file_path)
def list_available_files(self, rankings_dir):
"""List all available ranking files"""
if not os.path.exists(rankings_dir):
self.stderr.write(self.style.ERROR(f'Rankings directory not found: {rankings_dir}'))
return
files = [f for f in os.listdir(rankings_dir) if f.endswith('.csv')]
files.sort()
self.stdout.write(self.style.SUCCESS(f'Found {len(files)} ranking files:'))
for f in files:
self.stdout.write(f' - {f}')
def parse_rankings_file(self, file_path):
"""Parse a rankings file and return player data and metadata"""
try:
self.stdout.write(f"Loading file: {file_path}...")
# Read the file and parse data
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
self.stdout.write(f"File loaded. Found {len(lines)} lines, processing...")
# Extract file metadata from first lines
title = lines[0].strip().strip('"')
period = lines[1].strip().strip('"')
# Parse month and year from filename or content
filename = os.path.basename(file_path)
# Extract month-year from filename (format: CLASSEMENT-PADEL-MESSIEURS-MM-YYYY.csv)
match = re.search(r'(\d{2})-(\d{4})', filename)
if match:
month = int(match.group(1))
year = int(match.group(2))
else:
# Try to extract from period
match = re.search(r'(\w+)\s+(\d{4})', period)
if match:
month_name = match.group(1)
month_names = ["JANVIER", "FEVRIER", "MARS", "AVRIL", "MAI", "JUIN",
"JUILLET", "AOUT", "SEPTEMBRE", "OCTOBRE", "NOVEMBRE", "DECEMBRE"]
if month_name.upper() in month_names:
month = month_names.index(month_name.upper()) + 1
else:
month = datetime.now().month
year = int(match.group(2))
else:
# Default to current
month = datetime.now().month
year = datetime.now().year
# Extract gender from filename
gender = "UNKNOWN"
if "MESSIEURS" in filename:
gender = "MESSIEURS"
elif "DAMES" in filename:
gender = "DAMES"
# Extract tranche/series from filename (e.g., MESSIEURS-2 or MESSIEURS-3)
tranche = None
tranche_match = re.search(r'MESSIEURS-(\d)', filename)
if tranche_match:
tranche = int(tranche_match.group(1))
metadata = {
'title': title,
'period': period,
'filename': filename,
'month': month,
'year': year,
'gender': gender,
'tranche': tranche
}
self.stdout.write(self.style.SUCCESS(f'Analyzing: {title} - {period}'))
# Find the actual data start (after header rows)
data_start = 0
for i, line in enumerate(lines):
if ';RANG;NOM;PRENOM;' in line:
data_start = i + 1
header = line.strip().split(';')
break
# Parse player data
self.stdout.write(f"Parsing player data from line {data_start}...")
players = []
line_count = 0
total_lines = len(lines[data_start:])
progress_interval = max(1, total_lines // 10) # Report progress at 10% intervals
for line in lines[data_start:]:
if not line.strip():
continue
values = line.strip().split(';')
if len(values) < 5: # Skip malformed lines
continue
# Create player record based on the Swift code line format
# ";\(rank);\(lastName);\(firstName);\(country);\(strippedLicense);\(pointsString);\(assimilation);
# \(tournamentCountString);\(ligue);\(formatNumbers(clubCode));\(club);\(progression.formattedAsRawString());
# \(bestRank?.formattedAsRawString() ?? "");\(birthYear?.formattedAsRawString() ?? "");"
player = {
'rank': values[1].strip() if len(values) > 1 and values[1].strip() else 'N/A',
'name': values[2].strip() if len(values) > 2 and values[2].strip() else 'N/A',
'first_name': values[3].strip() if len(values) > 3 and values[3].strip() else 'N/A',
'nationality': values[4].strip() if len(values) > 4 and values[4].strip() else 'N/A',
'license': values[5].strip() if len(values) > 5 and values[5].strip() else 'N/A',
'points': values[6].strip() if len(values) > 6 and values[6].strip() else 'N/A',
'assimilation': values[7].strip() if len(values) > 7 and values[7].strip() else 'N/A',
'tournaments_played': values[8].strip() if len(values) > 8 and values[8].strip() else 'N/A',
'league': values[9].strip() if len(values) > 9 and values[9].strip() else 'N/A',
'club_code': values[10].strip() if len(values) > 10 and values[10].strip() else 'N/A',
'club': values[11].strip() if len(values) > 11 and values[11].strip() else 'N/A',
'progression': values[12].strip() if len(values) > 12 and values[12].strip() else '0',
'best_rank': values[13].strip() if len(values) > 13 and values[13].strip() else 'N/A',
'birth_year': values[14].strip() if len(values) > 14 and values[14].strip() else 'N/A',
}
players.append(player)
# Show progress periodically
line_count += 1
if line_count % progress_interval == 0:
self.stdout.write(f" Progress: {line_count}/{total_lines} lines processed ({(line_count/total_lines)*100:.1f}%)")
return players, metadata
except Exception as e:
self.stderr.write(self.style.ERROR(f'Error parsing file: {str(e)}'))
return [], {}
def generate_statistics(self, players, options):
"""Generate and display statistics about the ranking data"""
total_players = len(players)
self.stdout.write(f'Total players: {total_players}')
# Top players
self.stdout.write(self.style.SUCCESS(f'\nTop {options["top"]} players:'))
for i, player in enumerate(players[:options["top"]]):
progression = f" ({player['progression']})" if player['progression'] != 'N/A' else ""
self.stdout.write(f'{i+1}. {player["rank"]} - {player["name"]} {player["first_name"]} - {player["points"]} points{progression}')
# League distribution
league_counter = collections.Counter([p["league"] for p in players if p["league"] != 'N/A'])
self.stdout.write(self.style.SUCCESS(f'\nPlayers by league (top {options["leagues"]}):'))
for league, count in league_counter.most_common(options["leagues"]):
percentage = (count / total_players) * 100
self.stdout.write(f'{league}: {count} players ({percentage:.1f}%)')
# Club distribution
club_counter = collections.Counter([p["club"] for p in players if p["club"] != 'N/A'])
self.stdout.write(self.style.SUCCESS(f'\nPlayers by club (top {options["clubs"]}):'))
for club, count in club_counter.most_common(options["clubs"]):
percentage = (count / total_players) * 100
self.stdout.write(f'{club}: {count} players ({percentage:.1f}%)')
# Points statistics (if numeric)
try:
points = [float(p["points"]) for p in players if p["points"] not in ('N/A', '')]
if points:
self.stdout.write(self.style.SUCCESS('\nPoints statistics:'))
self.stdout.write(f'Min: {min(points)}')
self.stdout.write(f'Max: {max(points)}')
self.stdout.write(f'Average: {sum(points) / len(points):.2f}')
self.stdout.write(f'Players with points: {len(points)} ({(len(points) / total_players) * 100:.1f}%)')
except ValueError:
# Points might not be numeric
pass
# Anonymous players count
anonymous_players = [p for p in players if self.is_anonymous_player(p)]
if anonymous_players:
self.stdout.write(self.style.SUCCESS(f'\nAnonymous players: {len(anonymous_players)} ({(len(anonymous_players) / total_players) * 100:.1f}%)'))
def is_anonymous_player(self, player):
"""Check if a player is anonymous (missing name data)"""
# Define criteria for anonymous players - adjust as needed
return (player['name'] == 'N/A' or player['name'] == '' or
player['first_name'] == 'N/A' or player['first_name'] == '')
def player_exists_in_current_month(self, prev_player, current_players_indexes):
"""
Check if a player from the previous month already exists in the current month.
Uses pre-built indexes for fast lookup.
Args:
prev_player: Player from previous month
current_players_indexes: Dictionary of indexes for fast lookup
Returns:
(exists, matching_player) tuple
"""
# 1. Check by license number (fastest)
if prev_player['license'] != 'N/A' and prev_player['license']:
license_index = current_players_indexes.get('license_index', {})
if prev_player['license'] in license_index:
return True, license_index[prev_player['license']]
return False, None
def build_current_players_indexes(self, current_players):
"""
Pre-process current players into lookup indexes for faster duplicate checking.
Returns a dictionary of indexes.
"""
self.stdout.write("Building player indexes for fast lookup...")
start_time = datetime.now()
# Players to index (only non-anonymous)
players_to_index = [p for p in current_players if not self.is_anonymous_player(p)]
# Create license index
license_index = {}
for player in players_to_index:
if player['license'] != 'N/A' and player['license']:
license_index[player['license']] = player
# Create name index
name_index = {}
for player in players_to_index:
if player['name'] != 'N/A' and player['first_name'] != 'N/A':
name_key = f"{player['name'].lower()}_{player['first_name'].lower()}"
name_index[name_key] = player
# Create name+club/league index
name_club_league_index = {}
for player in players_to_index:
if player['name'] != 'N/A':
# Name + club
if player['club'] != 'N/A':
name_club_key = f"{player['name'].lower()}_{player['club'].lower()}"
name_club_league_index[name_club_key] = player
# Name + league
if player['league'] != 'N/A':
name_league_key = f"{player['name'].lower()}_{player['league'].lower()}"
name_club_league_index[name_league_key] = player
indexes = {
'license_index': license_index,
'name_index': name_index,
'name_club_league_index': name_club_league_index
}
elapsed = (datetime.now() - start_time).total_seconds()
self.stdout.write(f"Indexes built in {elapsed:.2f} seconds. License keys: {len(license_index)}, Name keys: {len(name_index)}")
return indexes
def find_previous_month_file(self, current_metadata, rankings_dir):
"""Find the rankings file for the previous month"""
current_month = current_metadata['month']
current_year = current_metadata['year']
gender = current_metadata['gender']
tranche = current_metadata['tranche']
# Calculate previous month and year
prev_month = current_month - 1
prev_year = current_year
if prev_month == 0:
prev_month = 12
prev_year = current_year - 1
# Format for filename pattern
tranche_part = f"-{tranche}" if tranche else ""
pattern = f"CLASSEMENT-PADEL-{gender}{tranche_part}-{prev_month:02d}-{prev_year}.csv"
# Look for exact match first
exact_path = os.path.join(rankings_dir, pattern)
if os.path.exists(exact_path):
return exact_path
# Otherwise, try more fuzzy matching
pattern_base = f"CLASSEMENT-PADEL-{gender}{tranche_part}-{prev_month:02d}"
for filename in os.listdir(rankings_dir):
if filename.startswith(pattern_base) and filename.endswith(".csv"):
return os.path.join(rankings_dir, filename)
# If still not found, look for any file from previous month
pattern_fallback = f"CLASSEMENT-PADEL-{gender}-{prev_month:02d}"
for filename in os.listdir(rankings_dir):
if filename.startswith(pattern_fallback) and filename.endswith(".csv"):
return os.path.join(rankings_dir, filename)
return None
def find_anonymous_players(self, current_players, current_metadata, rankings_dir, options, file_path=None, return_count=False):
"""
Find anonymous players and try to match them with players from previous month.
Args:
current_players: List of current month players
current_metadata: Metadata about current month file
rankings_dir: Directory containing ranking files
options: Command options
file_path: Path to current month file (for auto-match)
return_count: Whether to return the count of matched players
Returns:
Number of matched players if return_count is True, otherwise None
"""
start_time = datetime.now()
# Initialize matched_count
matched_count = 0
# Identify anonymous players
anonymous_players = [p for p in current_players if self.is_anonymous_player(p)]
if not anonymous_players:
self.stdout.write(self.style.SUCCESS('No anonymous players found!'))
if return_count:
return 0
return
self.stdout.write(self.style.SUCCESS(f'\nFound {len(anonymous_players)} anonymous players. Looking for matches...'))
# Find previous month file
prev_month_file = self.find_previous_month_file(current_metadata, rankings_dir)
if not prev_month_file:
self.stderr.write(self.style.ERROR('Previous month rankings file not found!'))
if return_count:
return 0
return
self.stdout.write(f'Using previous month file: {os.path.basename(prev_month_file)}')
# Load previous month data
self.stdout.write('Loading previous month data...')
prev_players, prev_metadata = self.parse_rankings_file(prev_month_file)
if not prev_players:
self.stderr.write(self.style.ERROR('Could not load previous month data!'))
if return_count:
return 0
return
# Build fast lookup indexes for current players (major performance optimization)
current_players_indexes = self.build_current_players_indexes(current_players)
# Track potential matches
matches_found = 0
high_confidence_matches = 0
skipped_existing_players = 0
results = []
# For each anonymous player, try to find matches
self.stdout.write(f'Analyzing {len(anonymous_players)} anonymous players...')
progress_counter = 0
progress_interval = max(1, len(anonymous_players) // 10) # Report progress at 10% intervals
for anon_player in anonymous_players:
# Show progress
progress_counter += 1
if progress_counter % progress_interval == 0 or progress_counter == 1:
self.stdout.write(f' Processing anonymous player {progress_counter}/{len(anonymous_players)} ({(progress_counter/len(anonymous_players))*100:.1f}%)')
potential_matches = self.find_potential_matches(anon_player, prev_players, current_players_indexes, options)
if potential_matches:
matches_found += 1
best_match = potential_matches[0] # Highest confidence match
# Record the match info
match_info = {
'anonymous_player': anon_player,
'potential_matches': potential_matches,
'best_match': best_match
}
results.append(match_info)
# Output match information
progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else ""
assimilation = f", Assimilation: {anon_player['assimilation']}" if anon_player['assimilation'] != 'N/A' else ""
self.stdout.write(f"\nAnonymous player: Rank {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation}")
for i, match in enumerate(potential_matches[:3]): # Show top 3 matches
player = match['player']
confidence = match['confidence']
match_reasons = match['match_reasons']
self.stdout.write(f" Match {i+1}: {player['name']} {player['first_name']} (Rank: {player['rank']}, League: {player['league']})")
self.stdout.write(f" Confidence: {confidence:.2f}, Match reasons: {match_reasons}")
# Count high confidence matches
if best_match['confidence'] >= options['confidence']:
high_confidence_matches += 1
else:
if options['verbose']:
self.stdout.write(f"\nNo matches found for anonymous player: Rank {anon_player['rank']}, League: {anon_player['league']}")
# Batch processing status update
if progress_counter % 100 == 0 and progress_counter > 0:
elapsed = (datetime.now() - start_time).total_seconds()
per_player = elapsed / progress_counter
remaining = (len(anonymous_players) - progress_counter) * per_player
self.stdout.write(f" Processed {progress_counter}/{len(anonymous_players)} players in {elapsed:.1f}s")
self.stdout.write(f" Estimated time remaining: {remaining:.1f}s ({per_player:.3f}s per player)")
# Final timing
total_elapsed = (datetime.now() - start_time).total_seconds()
self.stdout.write(f"Analysis completed in {total_elapsed:.2f} seconds ({total_elapsed/len(anonymous_players):.3f}s per player)")
# Summary
self.stdout.write(self.style.SUCCESS(f'\nMatching summary:'))
self.stdout.write(f'Total anonymous players: {len(anonymous_players)}')
self.stdout.write(f'Players with potential matches: {matches_found}')
self.stdout.write(f'High confidence matches (≥{options["confidence"]}): {high_confidence_matches}')
self.stdout.write(f'Skipped players already in current month: {skipped_existing_players}')
# Save results if requested
if options['output']:
self.stdout.write(f'Saving results to {options["output"]}...')
self.save_results(results, options['output'])
# Auto-match players if requested
if options['auto_match'] and matches_found > 0 and file_path:
matched_count = self.update_rankings_with_matches(file_path, anonymous_players, results,
options['confidence'], options)
elif options['auto_match'] and file_path is None:
self.stderr.write(self.style.ERROR("Auto-match was requested but file path is not available. No changes were made."))
# Return matched count if requested
if return_count:
return matched_count
return None
def find_potential_matches(self, anon_player, prev_players, current_players_indexes, options):
"""Find potential matches for an anonymous player from previous month data"""
start_time = datetime.now()
potential_matches = []
skipped_players = 0
# Show what we're matching
if options['verbose']:
progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else ""
self.stdout.write(f" Finding matches for anonymous player: Rank {anon_player['rank']}{progression}, League: {anon_player['league']}")
# Get ranking as integer if possible
try:
anon_rank = int(anon_player['rank']) if anon_player['rank'] != 'N/A' else None
except ValueError:
anon_rank = None
# Parse progression to get previous rank if available
prev_rank_from_progression = None
prog_value = 0 # Default if no progression
if anon_player['progression'] != 'N/A' and anon_player['progression']:
try:
# Progression can be like "+5", "-10", "=", etc.
prog_str = anon_player['progression'].strip()
if prog_str.startswith('+'):
# CRITICAL FIX: If progression is positive (e.g., +96), player moved UP by 96 places
# So previous rank is HIGHER (current rank + progression)
prog_value = int(prog_str)
elif prog_str.startswith('-'):
# If progression is negative (e.g., -10), player moved DOWN by 10 places
# So previous rank is LOWER (current rank + progression)
prog_value = int(prog_str)
elif prog_str == '=':
prog_value = 0
# Handle pure numeric progression without sign
elif prog_str.isdigit() or (prog_str.isdigit() and prog_str.startswith('-')):
prog_value = int(prog_str)
# Default to 0 for "NEW" or other special values
except ValueError:
prog_value = 0
# Calculate expected previous rank
if anon_rank is not None:
prev_rank_from_progression = anon_rank + prog_value # Add progression for previous rank
if options['verbose']:
self.stdout.write(f" Target previous rank: {prev_rank_from_progression} (current rank {anon_rank} + progression {prog_value})")
for prev_player in prev_players:
# Skip anonymous players in previous month
if self.is_anonymous_player(prev_player):
continue
# Check if this player exists in current month with the same license
exists, existing_player = self.player_exists_in_current_month(prev_player, current_players_indexes)
if exists:
# If we found the exact same player (same license), skip them
if existing_player['license'] == prev_player['license']:
skipped_players += 1
continue
# If we found someone with the same name but different license, we can still consider this player
# Initialize match data
match_data = {
'player': prev_player,
'rank_match_type': None,
'rank_diff': None,
'has_league_match': False,
'has_assimilation_match': False,
'points_similarity': 0.0,
'match_reasons': [],
'confidence': 0.0
}
# 1. PRIMARY MATCHER: Previous rank match
if prev_rank_from_progression is not None:
try:
prev_rank_value = int(prev_player['rank'])
if prev_rank_value is not None:
rank_diff = abs(prev_rank_value - prev_rank_from_progression)
match_data['rank_diff'] = rank_diff
if rank_diff == 0:
match_data['rank_match_type'] = 'exact'
match_data['match_reasons'].append(f"exact previous rank match ({prev_rank_value})")
match_data['confidence'] = 0.7
elif rank_diff <= 3:
match_data['rank_match_type'] = 'close'
match_data['match_reasons'].append(f"close previous rank match ({prev_rank_value} vs {prev_rank_from_progression})")
match_data['confidence'] = 0.4
elif rank_diff <= 10:
match_data['rank_match_type'] = 'approximate'
match_data['match_reasons'].append(f"approximate previous rank match ({prev_rank_value} vs {prev_rank_from_progression})")
match_data['confidence'] = 0.2
except ValueError:
pass
# 2. Points similarity (new check)
try:
if anon_player['points'] != 'N/A' and prev_player['points'] != 'N/A':
anon_points = float(anon_player['points'])
prev_points = float(prev_player['points'])
points_diff = abs(anon_points - prev_points)
points_similarity = max(0, 1 - (points_diff / max(anon_points, prev_points)))
if points_similarity > 0.9:
match_data['points_similarity'] = points_similarity
match_data['match_reasons'].append(f"similar points ({prev_points} vs {anon_points})")
match_data['confidence'] += 0.2
except ValueError:
pass
# 3. League match
if anon_player['league'] != 'N/A' and prev_player['league'] != 'N/A':
if anon_player['league'] == prev_player['league']:
match_data['has_league_match'] = True
match_data['match_reasons'].append("league match")
match_data['confidence'] += 0.25
# 4. Assimilation match
if anon_player['assimilation'] != 'N/A' and prev_player['assimilation'] != 'N/A':
if anon_player['assimilation'] == prev_player['assimilation']:
match_data['has_assimilation_match'] = True
match_data['match_reasons'].append("assimilation match")
match_data['confidence'] += 0.1
# Only consider matches with minimum confidence
if match_data['confidence'] >= 0.1:
match_data['match_reasons'] = ", ".join(match_data['match_reasons'])
potential_matches.append(match_data)
# Sort matches with updated criteria
def match_sort_key(match):
rank_score = {
'exact': 1000,
'close': 100,
'approximate': 10,
None: 1
}.get(match['rank_match_type'], 0)
points_score = int(match.get('points_similarity', 0) * 100)
league_value = 2 if match['has_league_match'] else 1
assimilation_value = 2 if match['has_assimilation_match'] else 1
return (rank_score, points_score, league_value, assimilation_value, match['confidence'])
potential_matches.sort(key=match_sort_key, reverse=True)
return potential_matches
def save_results(self, results, output_path):
"""Save matching results to a file"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write("Anonymous Player Matching Results\n")
f.write("================================\n\n")
for match_info in results:
anon_player = match_info['anonymous_player']
best_match = match_info['best_match']
progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else ""
assimilation = f", Assimilation: {anon_player['assimilation']}" if anon_player['assimilation'] != 'N/A' else ""
f.write(f"Anonymous Player (Rank: {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation})\n")
f.write(f"Best Match: {best_match['player']['name']} {best_match['player']['first_name']}\n")
f.write(f" Confidence: {best_match['confidence']:.2f}\n")
f.write(f" Match reasons: {best_match['match_reasons']}\n")
f.write(f" Previous Rank: {best_match['player']['rank']}\n")
f.write(f" League: {best_match['player']['league']}\n")
f.write(f" Club: {best_match['player']['club']}\n\n")
self.stdout.write(self.style.SUCCESS(f'Results saved to {output_path}'))
except Exception as e:
self.stderr.write(self.style.ERROR(f'Error saving results: {str(e)}'))
def update_rankings_with_matches(self, file_path, anonymous_players, matches, confidence_threshold, options):
"""
Update the rankings file with matched player information
Args:
file_path: Path to the current month's rankings file
anonymous_players: List of anonymous players
matches: List of match info dictionaries
confidence_threshold: Minimum confidence to apply auto-matching
options: Command options
Returns:
Number of players that were updated
"""
self.stdout.write(self.style.SUCCESS(f"\nAuto-matching players with confidence ≥ {confidence_threshold}..."))
# Create a backup of the original file
backup_path = f"{file_path}.bak"
shutil.copy2(file_path, backup_path)
self.stdout.write(f"Created backup of original file at: {backup_path}")
# Read the original file
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Create a map of anonymous players by rank for faster lookup
anon_by_rank = {}
for player in anonymous_players:
if player['rank'] != 'N/A':
anon_by_rank[player['rank']] = player
# Track which players will be updated (use a dictionary to ensure only one update per anonymous player)
players_to_update = {}
for match_info in matches:
anon_player = match_info['anonymous_player']
best_match = match_info['best_match']
rank = anon_player['rank']
if best_match['confidence'] >= confidence_threshold and rank not in players_to_update:
# This match has high enough confidence to auto-apply
# Only add if we haven't already found a match for this rank
players_to_update[rank] = {
'anonymous_player': anon_player,
'match': best_match
}
if not players_to_update:
self.stdout.write("No players met the confidence threshold for auto-matching.")
return 0 # Return 0 because no players were updated
self.stdout.write(f"Found {len(players_to_update)} players to update.")
# Process the file line by line
updated_count = 0
updated_lines = []
already_updated_ranks = set() # Track which ranks we've already updated
# First, we need to find the data start line
data_start_line = 0
for i, line in enumerate(lines):
if ';RANG;NOM;PRENOM;' in line:
data_start_line = i + 1
break
# Keep header lines unchanged
updated_lines.extend(lines[:data_start_line])
# Process data lines
for line in lines[data_start_line:]:
if not line.strip():
updated_lines.append(line)
continue
# Parse the line
values = line.strip().split(';')
if len(values) < 3:
updated_lines.append(line)
continue
# Check if this is an anonymous player line
rank = values[1].strip() if len(values) > 1 else ''
name = values[2].strip() if len(values) > 2 else ''
first_name = values[3].strip() if len(values) > 3 else ''
# Skip if we've already updated this rank (prevent duplicates)
if rank in already_updated_ranks:
updated_lines.append(line)
continue
# CRITICAL CHECK: Only update if this is actually an anonymous player
# Check if player is anonymous (empty or missing name fields)
is_anonymous = not name or not first_name
if rank in players_to_update and is_anonymous:
# This is an anonymous player line with a match to apply
update_info = players_to_update[rank]
matched_player = update_info['match']['player']
# Log the current values for debugging
self.stdout.write(f"Updating anonymous player at rank {rank}. Current values: Name='{name}', First name='{first_name}'")
# Update this line with matched player info
# Basic information: name and first name
values[2] = matched_player['name'] # Last name
values[3] = matched_player['first_name'] # First name
# Update nationality if available
if matched_player['nationality'] != 'N/A' and len(values) > 4:
values[4] = matched_player['nationality']
# Update license if available
if matched_player['license'] != 'N/A' and len(values) > 5:
values[5] = matched_player['license']
# Additional fields:
# Club code (position 10)
if matched_player['club_code'] != 'N/A' and len(values) > 10:
values[10] = matched_player['club_code']
# Club name (position 11)
if matched_player['club'] != 'N/A' and len(values) > 11:
values[11] = matched_player['club']
# Birth year (position 14)
if matched_player['birth_year'] != 'N/A' and len(values) > 14:
values[14] = matched_player['birth_year']
# Reconstruct the line
updated_line = ';'.join(values) + '\n'
updated_lines.append(updated_line)
updated_count += 1
# Mark this rank as updated to prevent duplicates
already_updated_ranks.add(rank)
self.stdout.write(f"Updated player rank {rank}: {matched_player['name']} {matched_player['first_name']}")
else:
# Not an anonymous player or no match to apply - keep the line unchanged
updated_lines.append(line)
# If this is a non-anonymous player with a rank that was in our update list,
# log a warning that we skipped it
if rank in players_to_update and not is_anonymous:
self.stdout.write(self.style.WARNING(
f"WARNING: Skipped rank {rank} because it already contains a non-anonymous player: {name} {first_name}"
))
# Write the updated file
with open(file_path, 'w', encoding='utf-8') as f:
f.writelines(updated_lines)
self.stdout.write(self.style.SUCCESS(f"\nUpdated {updated_count} players in the rankings file."))
self.stdout.write(f"Original file backed up to: {backup_path}")
return updated_count # Return the count of updated players
def iterative_match_anonymous_players(self, file_path, rankings_dir, options):
"""
Iteratively match anonymous players until no more matches can be found.
Uses temporary files to optimize processing speed.
"""
iteration = 1
total_matched = 0
changes_made = True
self.stdout.write(self.style.SUCCESS("\n=== Starting optimized iterative matching process ==="))
# Load initial data
current_players, current_metadata = self.parse_rankings_file(file_path)
# Count anonymous players at the start
anonymous_players = [p for p in current_players if self.is_anonymous_player(p)]
initial_anonymous_count = len(anonymous_players)
if initial_anonymous_count == 0:
self.stdout.write(self.style.SUCCESS("No anonymous players found. Process complete!"))
return
self.stdout.write(f"Initial anonymous players: {initial_anonymous_count}")
# Find previous month file
prev_month_file = self.find_previous_month_file(current_metadata, rankings_dir)
if not prev_month_file:
self.stderr.write(self.style.ERROR('Previous month rankings file not found!'))
return
self.stdout.write(f'Using previous month file: {os.path.basename(prev_month_file)}')
# Load previous month data
prev_players, prev_metadata = self.parse_rankings_file(prev_month_file)
# Create temp directory for our working files
with tempfile.TemporaryDirectory() as temp_dir:
self.stdout.write(f"Created temporary directory for working files: {temp_dir}")
# Generate initial temp files
anon_file = os.path.join(temp_dir, "anonymous_players.json")
prev_players_file = os.path.join(temp_dir, "prev_month_players.json")
matches_file = os.path.join(temp_dir, "matches.json")
# Extract anonymous players and filter previous month players
self.stdout.write("Creating initial working files...")
filtered_data = self.create_filtered_working_files(current_players, prev_players, anon_file, prev_players_file)
anon_count = filtered_data['anon_count']
prev_count = filtered_data['prev_count']
self.stdout.write(f"Extracted {anon_count} anonymous players and {prev_count} eligible previous month players")
# Main iteration loop
while changes_made and anon_count > 0:
self.stdout.write(self.style.SUCCESS(f"\n--- Iteration {iteration} ---"))
self.stdout.write(f"Anonymous players remaining: {anon_count}")
self.stdout.write(f"Previous month players to check: {prev_count}")
# Process the current state of temp files
matched_count = self.match_players_from_temp_files(
anon_file, prev_players_file, matches_file,
file_path, current_metadata, options
)
# Check if changes were made
if matched_count > 0:
total_matched += matched_count
self.stdout.write(self.style.SUCCESS(
f"Iteration {iteration} complete: Matched {matched_count} players"
))
changes_made = True
# Update current players from the main file
current_players, _ = self.parse_rankings_file(file_path)
# Update temp files for next iteration
filtered_data = self.create_filtered_working_files(current_players, prev_players, anon_file, prev_players_file)
anon_count = filtered_data['anon_count']
prev_count = filtered_data['prev_count']
self.stdout.write(f"Updated working files: {anon_count} anonymous players and {prev_count} eligible previous month players")
else:
self.stdout.write(self.style.SUCCESS(f"Iteration {iteration} complete: No new matches found"))
changes_made = False
# Increment iteration counter
iteration += 1
# Prevent infinite loops (optional safety check)
if iteration > 10: # Cap at 10 iterations maximum
self.stdout.write(self.style.WARNING("Maximum iterations reached (10). Stopping process."))
break
# Final summary
self.stdout.write(self.style.SUCCESS("\n=== Iterative matching process complete ==="))
self.stdout.write(f"Total iterations: {iteration - 1}")
self.stdout.write(f"Total players matched: {total_matched}")
# Final statistics
final_players, _ = self.parse_rankings_file(file_path)
final_anonymous_count = len([p for p in final_players if self.is_anonymous_player(p)])
self.stdout.write(f"Anonymous players remaining: {final_anonymous_count}")
# Calculate improvement percentage
if initial_anonymous_count > 0: # Avoid division by zero
improvement = ((initial_anonymous_count - final_anonymous_count) / initial_anonymous_count) * 100
self.stdout.write(f"Data completeness improved by {improvement:.1f}%")
def create_filtered_working_files(self, current_players, prev_players, anon_file, prev_players_file):
"""
Create filtered working files:
1. anonymous_players.json - Contains only anonymous players from current month
2. prev_month_players.json - Contains only players from previous month not in current month
Returns dictionary with counts of players in each file
"""
# Extract anonymous players from current month
anonymous_players = [p for p in current_players if self.is_anonymous_player(p)]
# Create lookup for current non-anonymous players
current_players_lookup = {}
for player in current_players:
if not self.is_anonymous_player(player):
# License lookup
if player['license'] != 'N/A' and player['license']:
current_players_lookup[f"license_{player['license']}"] = True
# Filter previous month players (only keep those not in current month)
filtered_prev_players = []
for player in prev_players:
if self.is_anonymous_player(player):
continue # Skip anonymous players from previous month
# Check if this player exists in current month
exists_in_current = False
# Check by license
if player['license'] != 'N/A' and player['license']:
if f"license_{player['license']}" in current_players_lookup:
exists_in_current = True
# Add to filtered list if not in current month
if not exists_in_current:
filtered_prev_players.append(player)
# Write anonymous players to file
with open(anon_file, 'w', encoding='utf-8') as f:
json.dump(anonymous_players, f, ensure_ascii=False)
# Write filtered previous players to file
with open(prev_players_file, 'w', encoding='utf-8') as f:
json.dump(filtered_prev_players, f, ensure_ascii=False)
return {
'anon_count': len(anonymous_players),
'prev_count': len(filtered_prev_players)
}
def match_players_from_temp_files(self, anon_file, prev_players_file, matches_file,
original_file, current_metadata, options):
"""
Match players between the anonymous and previous month temp files
and update the original file with matches
"""
# Load anonymous players
with open(anon_file, 'r', encoding='utf-8') as f:
anonymous_players = json.load(f)
# Load previous month players
with open(prev_players_file, 'r', encoding='utf-8') as f:
prev_players = json.load(f)
if not anonymous_players or not prev_players:
return 0
# Create indexes for efficient lookup
current_players_indexes = {
'license_index': {},
'name_index': {},
'name_club_league_index': {}
}
# Find matches
results = []
for anon_player in anonymous_players:
potential_matches = self.find_potential_matches(anon_player, prev_players, current_players_indexes, options)
if potential_matches:
best_match = potential_matches[0] # Highest confidence match
# Record the match info
match_info = {
'anonymous_player': anon_player,
'potential_matches': potential_matches,
'best_match': best_match
}
results.append(match_info)
# Save matches to file
with open(matches_file, 'w', encoding='utf-8') as f:
# We can't directly serialize the complex match data, so extract key info
serializable_results = []
for match_info in results:
serializable_results.append({
'anonymous_player': match_info['anonymous_player'],
'best_match': {
'player': match_info['best_match']['player'],
'confidence': match_info['best_match']['confidence'],
'match_reasons': match_info['best_match']['match_reasons']
}
})
json.dump(serializable_results, f, ensure_ascii=False)
# Apply matches to the original file
if results:
matched_count = self.update_rankings_with_matches(
original_file, anonymous_players, results, options['confidence'], options
)
return matched_count
return 0