You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/tournaments/management/commands/analyze_rankings.py

1235 lines
61 KiB

from django.core.management.base import BaseCommand, CommandError
import os
import csv
import collections
import re
from datetime import datetime
from django.conf import settings
import json
import tempfile
import shutil
class Command(BaseCommand):
help = 'Analyze a padel rankings CSV file and provide statistics'
def add_arguments(self, parser):
parser.add_argument('file_path', nargs='?', type=str, help='Relative path to the rankings file from the static/rankings directory')
parser.add_argument('--full-path', type=str, help='Full path to the rankings file (alternative to file_path)')
parser.add_argument('--list-files', action='store_true', help='List available ranking files')
parser.add_argument('--top', type=int, default=10, help='Number of top players to display')
parser.add_argument('--clubs', type=int, default=10, help='Number of top clubs to display')
parser.add_argument('--leagues', type=int, default=10, help='Number of top leagues to display')
parser.add_argument('--find-anonymous', action='store_true', help='Find and match anonymous players with previous month rankings')
parser.add_argument('--confidence', type=int, default=7, help='Confidence threshold for automatic matching (0-1)')
parser.add_argument('--auto-match', action='store_true', help='Automatically match anonymous players when confidence is high')
parser.add_argument('--output', type=str, help='Save results to output file')
parser.add_argument('--verbose', action='store_true', help='Show detailed matching information')
parser.add_argument('--named-only', action='store_true', help='Process only anonymous players WITH names (missing license)')
parser.add_argument('--unnamed-only', action='store_true', help='Process only anonymous players WITHOUT names')
def handle(self, *args, **options):
# Base directory for rankings files
rankings_dir = os.path.join(settings.BASE_DIR, 'tournaments', 'static', 'rankings')
# Check if user wants to list available files
if options['list_files']:
self.list_available_files(rankings_dir)
return
# Get the file path
if options['full_path']:
file_path = options['full_path']
elif options['file_path']:
file_path = os.path.join(rankings_dir, options['file_path'])
else:
self.stderr.write(self.style.ERROR('Please provide a file path or use --list-files to see available files'))
return
# Validate file exists
if not os.path.exists(file_path):
self.stderr.write(self.style.ERROR(f'File not found: {file_path}'))
return
# Process the file
players, metadata = self.parse_rankings_file(file_path)
# Generate statistics
if players:
# self.generate_statistics(players, options)
self.iterative_match_anonymous_players(file_path, rankings_dir, options)
# # Find anonymous players if requested
# if options['find_anonymous']:
# if options['auto_match']:
# # Iterative approach: keep matching until no more changes can be made
# self.iterative_match_anonymous_players(file_path, rankings_dir, options)
# else:
# # Single pass analysis without making changes
# self.find_anonymous_players(players, metadata, rankings_dir, options, file_path)
def list_available_files(self, rankings_dir):
"""List all available ranking files"""
if not os.path.exists(rankings_dir):
self.stderr.write(self.style.ERROR(f'Rankings directory not found: {rankings_dir}'))
return
files = [f for f in os.listdir(rankings_dir) if f.endswith('.csv')]
files.sort()
self.stdout.write(self.style.SUCCESS(f'Found {len(files)} ranking files:'))
for f in files:
self.stdout.write(f' - {f}')
def parse_rankings_file(self, file_path):
"""Parse a rankings file and return player data and metadata"""
try:
self.stdout.write(f"Loading file: {file_path}...")
# Read the file and parse data
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
self.stdout.write(f"File loaded. Found {len(lines)} lines, processing...")
# Extract file metadata from first lines
title = lines[0].strip().strip('"')
period = lines[1].strip().strip('"')
# Parse month and year from filename or content
filename = os.path.basename(file_path)
# Extract month-year from filename (format: CLASSEMENT-PADEL-MESSIEURS-MM-YYYY.csv)
match = re.search(r'(\d{2})-(\d{4})', filename)
if match:
month = int(match.group(1))
year = int(match.group(2))
else:
# Try to extract from period
match = re.search(r'(\w+)\s+(\d{4})', period)
if match:
month_name = match.group(1)
month_names = ["JANVIER", "FEVRIER", "MARS", "AVRIL", "MAI", "JUIN",
"JUILLET", "AOUT", "SEPTEMBRE", "OCTOBRE", "NOVEMBRE", "DECEMBRE"]
if month_name.upper() in month_names:
month = month_names.index(month_name.upper()) + 1
else:
month = datetime.now().month
year = int(match.group(2))
else:
# Default to current
month = datetime.now().month
year = datetime.now().year
# Extract gender from filename
gender = "UNKNOWN"
if "MESSIEURS" in filename:
gender = "MESSIEURS"
elif "DAMES" in filename:
gender = "DAMES"
# Extract tranche/series from filename (e.g., MESSIEURS-2 or MESSIEURS-3)
tranche = None
tranche_match = re.search(r'MESSIEURS-(\d)', filename)
if tranche_match:
tranche = int(tranche_match.group(1))
metadata = {
'title': title,
'period': period,
'filename': filename,
'month': month,
'year': year,
'gender': gender,
'tranche': tranche
}
self.stdout.write(self.style.SUCCESS(f'Analyzing: {title} - {period}'))
# Find the actual data start (after header rows)
data_start = 0
for i, line in enumerate(lines):
if ';RANG;NOM;PRENOM;' in line:
data_start = i + 1
header = line.strip().split(';')
break
# Parse player data
self.stdout.write(f"Parsing player data from line {data_start}...")
players = []
line_count = 0
total_lines = len(lines[data_start:])
progress_interval = max(1, total_lines // 10) # Report progress at 10% intervals
for line in lines[data_start:]:
if not line.strip():
continue
values = line.strip().split(';')
if len(values) < 5: # Skip malformed lines
continue
# Create player record based on the Swift code line format
# ";\(rank);\(lastName);\(firstName);\(country);\(strippedLicense);\(pointsString);\(assimilation);
# \(tournamentCountString);\(ligue);\(formatNumbers(clubCode));\(club);\(progression.formattedAsRawString());
# \(bestRank?.formattedAsRawString() ?? "");\(birthYear?.formattedAsRawString() ?? "");"
player = {
'rank': values[1].strip() if len(values) > 1 and values[1].strip() else 'N/A',
'name': values[2].strip() if len(values) > 2 and values[2].strip() else 'N/A',
'first_name': values[3].strip() if len(values) > 3 and values[3].strip() else 'N/A',
'nationality': values[4].strip() if len(values) > 4 and values[4].strip() else 'N/A',
'license': values[5].strip() if len(values) > 5 and values[5].strip() else 'N/A',
'points': values[6].strip() if len(values) > 6 and values[6].strip() else 'N/A',
'assimilation': values[7].strip() if len(values) > 7 and values[7].strip() else 'N/A',
'tournaments_played': values[8].strip() if len(values) > 8 and values[8].strip() else 'N/A',
'league': values[9].strip() if len(values) > 9 and values[9].strip() else 'N/A',
'club_code': values[10].strip() if len(values) > 10 and values[10].strip() else 'N/A',
'club': values[11].strip() if len(values) > 11 and values[11].strip() else 'N/A',
'progression': values[12].strip() if len(values) > 12 and values[12].strip() else '0',
'best_rank': values[13].strip() if len(values) > 13 and values[13].strip() else 'N/A',
'birth_year': values[14].strip() if len(values) > 14 and values[14].strip() else 'N/A',
}
players.append(player)
# Show progress periodically
line_count += 1
if line_count % progress_interval == 0:
self.stdout.write(f" Progress: {line_count}/{total_lines} lines processed ({(line_count/total_lines)*100:.1f}%)")
return players, metadata
except Exception as e:
self.stderr.write(self.style.ERROR(f'Error parsing file: {str(e)}'))
return [], {}
def generate_statistics(self, players, options):
"""Generate and display statistics about the ranking data"""
total_players = len(players)
self.stdout.write(f'Total players: {total_players}')
# Top players
self.stdout.write(self.style.SUCCESS(f'\nTop {options["top"]} players:'))
for i, player in enumerate(players[:options["top"]]):
progression = f" ({player['progression']})" if player['progression'] != 'N/A' else ""
self.stdout.write(f'{i+1}. {player["rank"]} - {player["name"]} {player["first_name"]} - {player["points"]} points{progression}')
# League distribution
league_counter = collections.Counter([p["league"] for p in players if p["league"] != 'N/A'])
self.stdout.write(self.style.SUCCESS(f'\nPlayers by league (top {options["leagues"]}):'))
for league, count in league_counter.most_common(options["leagues"]):
percentage = (count / total_players) * 100
self.stdout.write(f'{league}: {count} players ({percentage:.1f}%)')
# Club distribution
club_counter = collections.Counter([p["club"] for p in players if p["club"] != 'N/A'])
self.stdout.write(self.style.SUCCESS(f'\nPlayers by club (top {options["clubs"]}):'))
for club, count in club_counter.most_common(options["clubs"]):
percentage = (count / total_players) * 100
self.stdout.write(f'{club}: {count} players ({percentage:.1f}%)')
# Points statistics (if numeric)
try:
points = [float(p["points"]) for p in players if p["points"] not in ('N/A', '')]
if points:
self.stdout.write(self.style.SUCCESS('\nPoints statistics:'))
self.stdout.write(f'Min: {min(points)}')
self.stdout.write(f'Max: {max(points)}')
self.stdout.write(f'Average: {sum(points) / len(points):.2f}')
self.stdout.write(f'Players with points: {len(points)} ({(len(points) / total_players) * 100:.1f}%)')
except ValueError:
# Points might not be numeric
pass
# Anonymous players count
anonymous_players = [p for p in players if self.is_anonymous_player(p)]
if anonymous_players:
self.stdout.write(self.style.SUCCESS(f'\nAnonymous players: {len(anonymous_players)} ({(len(anonymous_players) / total_players) * 100:.1f}%)'))
def is_anonymous_player(self, player):
"""Check if a player is anonymous (missing name data or license)"""
# Player is anonymous if they have no name data
if (player['name'] == 'N/A' or player['name'] == '' or
player['first_name'] == 'N/A' or player['first_name'] == ''):
return True
# Player is also anonymous if they have name but no license
if (player['license'] == 'N/A' or player['license'] == ''):
return True
return False
def player_exists_in_current_month(self, prev_player, current_players_indexes):
"""
Check if a player from the previous month already exists in the current month.
Uses pre-built indexes for fast lookup.
Args:
prev_player: Player from previous month
current_players_indexes: Dictionary of indexes for fast lookup
Returns:
(exists, matching_player) tuple
"""
# 1. Check by license number (fastest)
if prev_player['license'] != 'N/A' and prev_player['license']:
license_index = current_players_indexes.get('license_index', {})
if prev_player['license'] in license_index:
return True, license_index[prev_player['license']]
return False, None
def build_current_players_indexes(self, current_players):
"""
Pre-process current players into lookup indexes for faster duplicate checking.
Returns a dictionary of indexes.
"""
self.stdout.write("Building player indexes for fast lookup...")
start_time = datetime.now()
# Only index players that have BOTH name AND license
players_to_index = [p for p in current_players
if (p['license'] != 'N/A' and p['license'] != '') and
(p['name'] != 'N/A' and p['name'] != '') and
(p['first_name'] != 'N/A' and p['first_name'] != '')]
# Create license index
license_index = {}
for player in players_to_index:
if player['license'] != 'N/A' and player['license']:
license_index[player['license']] = player
# Create name index
name_index = {}
for player in players_to_index:
if player['name'] != 'N/A' and player['first_name'] != 'N/A':
name_key = f"{player['name'].lower()}_{player['first_name'].lower()}"
name_index[name_key] = player
# Create name+club/league index
name_club_league_index = {}
for player in players_to_index:
if player['name'] != 'N/A':
# Name + club
if player['club'] != 'N/A':
name_club_key = f"{player['name'].lower()}_{player['club'].lower()}"
name_club_league_index[name_club_key] = player
# Name + league
if player['league'] != 'N/A':
name_league_key = f"{player['name'].lower()}_{player['league'].lower()}"
name_club_league_index[name_league_key] = player
indexes = {
'license_index': license_index,
'name_index': name_index,
'name_club_league_index': name_club_league_index
}
elapsed = (datetime.now() - start_time).total_seconds()
self.stdout.write(f"Indexes built in {elapsed:.2f} seconds. License keys: {len(license_index)}, Name keys: {len(name_index)}")
return indexes
def find_previous_month_file(self, current_metadata, rankings_dir):
"""Find the rankings file for the previous month"""
current_month = current_metadata['month']
current_year = current_metadata['year']
gender = current_metadata['gender']
tranche = current_metadata['tranche']
# Calculate previous month and year
prev_month = current_month - 1
prev_year = current_year
if prev_month == 0:
prev_month = 12
prev_year = current_year - 1
# Format for filename pattern
tranche_part = f"-{tranche}" if tranche else ""
pattern = f"CLASSEMENT-PADEL-{gender}{tranche_part}-{prev_month:02d}-{prev_year}.csv"
# Look for exact match first
exact_path = os.path.join(rankings_dir, pattern)
if os.path.exists(exact_path):
return exact_path
# Otherwise, try more fuzzy matching
pattern_base = f"CLASSEMENT-PADEL-{gender}{tranche_part}-{prev_month:02d}"
for filename in os.listdir(rankings_dir):
if filename.startswith(pattern_base) and filename.endswith(".csv"):
return os.path.join(rankings_dir, filename)
# If still not found, look for any file from previous month
pattern_fallback = f"CLASSEMENT-PADEL-{gender}-{prev_month:02d}"
for filename in os.listdir(rankings_dir):
if filename.startswith(pattern_fallback) and filename.endswith(".csv"):
return os.path.join(rankings_dir, filename)
return None
def find_anonymous_players(self, current_players, current_metadata, rankings_dir, options, file_path=None, return_count=False):
"""
Find anonymous players and try to match them with players from previous month.
Args:
current_players: List of current month players
current_metadata: Metadata about current month file
rankings_dir: Directory containing ranking files
options: Command options
file_path: Path to current month file (for auto-match)
return_count: Whether to return the count of matched players
Returns:
Number of matched players if return_count is True, otherwise None
"""
start_time = datetime.now()
# Initialize matched_count
matched_count = 0
# Identify anonymous players
all_anonymous_players = [p for p in current_players if self.is_anonymous_player(p)]
if not all_anonymous_players:
self.stdout.write(self.style.SUCCESS('No anonymous players found!'))
if return_count:
return 0
return
# Check for conflicting options
if options['named_only'] and options['unnamed_only']:
self.stderr.write(self.style.ERROR('Cannot use both --named-only and --unnamed-only options together'))
if return_count:
return 0
return
# Sort anonymous players by type
anonymous_players_with_names = []
anonymous_players_without_names = []
for player in all_anonymous_players:
if (player['name'] != 'N/A' and player['name'] != '' and
player['first_name'] != 'N/A' and player['first_name'] != ''):
anonymous_players_with_names.append(player)
else:
anonymous_players_without_names.append(player)
# Select which players to process based on options
if options['named_only']:
anonymous_players = anonymous_players_with_names
processing_type = "named anonymous players (with names but missing license)"
elif options['unnamed_only']:
anonymous_players = anonymous_players_without_names
processing_type = "unnamed anonymous players (missing names)"
else:
# Default behavior: process named players first, then unnamed
anonymous_players = anonymous_players_with_names + anonymous_players_without_names
processing_type = "all anonymous players (named first, then unnamed)"
if not anonymous_players:
if options['named_only']:
self.stdout.write(self.style.SUCCESS('No anonymous players with names found!'))
elif options['unnamed_only']:
self.stdout.write(self.style.SUCCESS('No anonymous players without names found!'))
if return_count:
return 0
return
# Display summary
self.stdout.write(self.style.SUCCESS(f'\nProcessing {processing_type}'))
self.stdout.write(f'Anonymous players breakdown:')
self.stdout.write(f' Total found: {len(all_anonymous_players)}')
self.stdout.write(f' With names: {len(anonymous_players_with_names)}')
self.stdout.write(f' Without names: {len(anonymous_players_without_names)}')
self.stdout.write(f' Selected for processing: {len(anonymous_players)}')
# Find previous month file
prev_month_file = self.find_previous_month_file(current_metadata, rankings_dir)
if not prev_month_file:
self.stderr.write(self.style.ERROR('Previous month rankings file not found!'))
if return_count:
return 0
return
self.stdout.write(f'Using previous month file: {os.path.basename(prev_month_file)}')
# Load previous month data
self.stdout.write('Loading previous month data...')
prev_players, prev_metadata = self.parse_rankings_file(prev_month_file)
if not prev_players:
self.stderr.write(self.style.ERROR('Could not load previous month data!'))
if return_count:
return 0
return
# Build fast lookup indexes for current players (major performance optimization)
current_players_indexes = self.build_current_players_indexes(current_players)
# Track potential matches
matches_found = 0
high_confidence_matches = 0
skipped_existing_players = 0
results = []
# For each anonymous player, try to find matches
self.stdout.write(f'Analyzing {len(anonymous_players)} anonymous players...')
progress_counter = 0
progress_interval = max(1, len(anonymous_players) // 10) # Report progress at 10% intervals
for anon_player in anonymous_players:
# Show progress
progress_counter += 1
if progress_counter % progress_interval == 0 or progress_counter == 1:
# Determine which type of player we're processing
if options['named_only']:
player_type = "named"
elif options['unnamed_only']:
player_type = "unnamed"
else:
# Default behavior: check if we're still processing named players
if progress_counter <= len(anonymous_players_with_names):
player_type = "named"
else:
player_type = "unnamed"
self.stdout.write(f' Processing {player_type} anonymous player {progress_counter}/{len(anonymous_players)} ({(progress_counter/len(anonymous_players))*100:.1f}%)')
potential_matches = self.find_potential_matches(anon_player, prev_players, current_players_indexes, options)
if potential_matches:
matches_found += 1
best_match = potential_matches[0] # Highest confidence match
# Record the match info
match_info = {
'anonymous_player': anon_player,
'potential_matches': potential_matches,
'best_match': best_match
}
results.append(match_info)
# Output match information
progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else ""
assimilation = f", Assimilation: {anon_player['assimilation']}" if anon_player['assimilation'] != 'N/A' else ""
# Show if this is a named or unnamed anonymous player
if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and
anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''):
self.stdout.write(f"\nNamed anonymous player: {anon_player['name']} {anon_player['first_name']} - Rank {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation}")
else:
self.stdout.write(f"\nUnnamed anonymous player: Rank {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation}")
for i, match in enumerate(potential_matches[:3]): # Show top 3 matches
player = match['player']
confidence = match['confidence']
match_reasons = match['match_reasons']
self.stdout.write(f" Match {i+1}: {player['name']} {player['first_name']} (Rank: {player['rank']}, League: {player['league']})")
self.stdout.write(f" Confidence: {confidence:.2f}, Match reasons: {match_reasons}")
# Count high confidence matches
if best_match['confidence'] >= options['confidence']:
high_confidence_matches += 1
else:
if options['verbose']:
if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and
anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''):
self.stdout.write(f"\nNo matches found for named anonymous player: {anon_player['name']} {anon_player['first_name']} - Rank {anon_player['rank']}, League: {anon_player['league']}")
else:
self.stdout.write(f"\nNo matches found for unnamed anonymous player: Rank {anon_player['rank']}, League: {anon_player['league']}")
# Batch processing status update
if progress_counter % 100 == 0 and progress_counter > 0:
elapsed = (datetime.now() - start_time).total_seconds()
per_player = elapsed / progress_counter
remaining = (len(anonymous_players) - progress_counter) * per_player
self.stdout.write(f" Processed {progress_counter}/{len(anonymous_players)} players in {elapsed:.1f}s")
self.stdout.write(f" Estimated time remaining: {remaining:.1f}s ({per_player:.3f}s per player)")
# Final timing
total_elapsed = (datetime.now() - start_time).total_seconds()
self.stdout.write(f"Analysis completed in {total_elapsed:.2f} seconds ({total_elapsed/len(anonymous_players):.3f}s per player)")
# Summary
self.stdout.write(self.style.SUCCESS(f'\nMatching summary:'))
self.stdout.write(f'Processing mode: {processing_type}')
self.stdout.write(f'Anonymous players processed: {len(anonymous_players)}')
if not options['named_only'] and not options['unnamed_only']:
self.stdout.write(f' Named: {len(anonymous_players_with_names)}')
self.stdout.write(f' Unnamed: {len(anonymous_players_without_names)}')
self.stdout.write(f'Players with potential matches: {matches_found}')
self.stdout.write(f'High confidence matches (≥{options["confidence"]}): {high_confidence_matches}')
self.stdout.write(f'Skipped players already in current month: {skipped_existing_players}')
# Save results if requested
if options['output']:
self.stdout.write(f'Saving results to {options["output"]}...')
self.save_results(results, options['output'])
# Auto-match players if requested
if options['auto_match'] and matches_found > 0 and file_path:
# Note: We pass the selected anonymous_players for matching
matched_count = self.update_rankings_with_matches(file_path, anonymous_players, results,
options['confidence'], options)
elif options['auto_match'] and file_path is None:
self.stderr.write(self.style.ERROR("Auto-match was requested but file path is not available. No changes were made."))
# Return matched count if requested
if return_count:
return matched_count
return None
def find_potential_matches(self, anon_player, prev_players, current_players_indexes, options):
"""Find potential matches for an anonymous player from previous month data"""
start_time = datetime.now()
potential_matches = []
skipped_players = 0
# Show what we're matching
if options['verbose']:
progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else ""
self.stdout.write(f" Finding matches for anonymous player: Rank {anon_player['rank']}{progression}, League: {anon_player['league']}")
# Get ranking as integer if possible
try:
anon_rank = int(anon_player['rank']) if anon_player['rank'] != 'N/A' else None
except ValueError:
anon_rank = None
# Parse progression to get previous rank if available
prev_rank_from_progression = None
prog_value = 0 # Default if no progression
if anon_player['progression'] != 'N/A' and anon_player['progression']:
try:
# Progression can be like "+5", "-10", "=", etc.
prog_str = anon_player['progression'].strip()
if prog_str.startswith('+'):
# CRITICAL FIX: If progression is positive (e.g., +96), player moved UP by 96 places
# So previous rank is HIGHER (current rank + progression)
prog_value = int(prog_str)
elif prog_str.startswith('-'):
# If progression is negative (e.g., -10), player moved DOWN by 10 places
# So previous rank is LOWER (current rank + progression)
prog_value = int(prog_str)
elif prog_str == '=':
prog_value = 0
# Handle pure numeric progression without sign
elif prog_str.isdigit() or (prog_str.isdigit() and prog_str.startswith('-')):
prog_value = int(prog_str)
# Default to 0 for "NEW" or other special values
except ValueError:
prog_value = 0
# Calculate expected previous rank
if anon_rank is not None:
prev_rank_from_progression = anon_rank + prog_value # Add progression for previous rank
if options['verbose']:
self.stdout.write(f" Target previous rank: {prev_rank_from_progression} (current rank {anon_rank} + progression {prog_value})")
# Show anonymous player details
self.stdout.write("\n" + "="*80)
self.stdout.write(f"Looking for matches for anonymous player at rank {anon_player['rank']}:")
self.stdout.write(f" Points: {anon_player['points']}")
self.stdout.write(f" Assimilation: {anon_player['assimilation']}")
self.stdout.write(f" Tournaments: {anon_player['tournaments_played']}")
self.stdout.write(f" League: {anon_player['league']}")
if anon_player['name'] != 'N/A' and anon_player['first_name'] != 'N/A':
self.stdout.write(f" Name: {anon_player['name']} {anon_player['first_name']}")
self.stdout.write("-"*80)
for prev_player in prev_players:
# Skip anonymous players in previous month
if self.is_anonymous_player(prev_player):
continue
# Initialize match data
match_data = {
'player': prev_player,
'match_reasons': [],
'confidence': 0
}
# Print candidate details
self.stdout.write(f"\nChecking candidate: {prev_player['name']} {prev_player['first_name']}")
self.stdout.write(f" Rank: {prev_player['rank']}")
self.stdout.write(f" Points: {prev_player['points']}")
self.stdout.write(f" Assimilation: {prev_player['assimilation']}")
self.stdout.write(f" Tournaments: {prev_player['tournaments_played']}")
self.stdout.write(f" League: {prev_player['league']}")
# Start building confidence score
confidence_details = []
# 1. PRIMARY MATCHER: Previous rank match
if prev_rank_from_progression is not None:
try:
prev_rank_value = int(prev_player['rank'])
if prev_rank_value is not None:
rank_diff = abs(prev_rank_value - prev_rank_from_progression)
match_data['rank_diff'] = rank_diff
if rank_diff == 0:
match_data['rank_match_type'] = 'exact'
match_data['match_reasons'].append(f"exact previous rank match ({prev_rank_value})")
match_data['confidence'] = 7
# Assimilation match
if anon_player['assimilation'] == prev_player['assimilation']:
match_data['confidence'] += 3
confidence_details.append(f"Assimilation match (+0.3)")
match_data['match_reasons'].append(f"same assimilation ({anon_player['assimilation']})")
# League match
if (anon_player['league'] == prev_player['league'] and
anon_player['league'] != 'N/A' and anon_player['league'] != ''):
match_data['confidence'] += 7
confidence_details.append(f"League match (+0.5)")
match_data['match_reasons'].append(f"same league ({anon_player['league']})")
# Tournament count comparison
try:
anon_tournaments = int(anon_player['tournaments_played'])
prev_tournaments = int(prev_player['tournaments_played'])
tournaments_diff = abs(anon_tournaments - prev_tournaments)
if tournaments_diff == 0:
match_data['confidence'] += 4
confidence_details.append(f"Tournaments unchanged (+0.2)")
match_data['match_reasons'].append(f"same tournaments played ({anon_tournaments})")
else:
# Calculate percentage difference
max_tournaments = max(anon_tournaments, prev_tournaments)
if max_tournaments > 0:
percentage_diff = (tournaments_diff / max_tournaments) * 100
if percentage_diff <= 10:
match_data['confidence'] += 3
confidence_details.append(f"Tournaments within 10% range (+0.15, diff: {percentage_diff:.1f}%)")
match_data['match_reasons'].append(f"tournaments played: prev={prev_tournaments}, current={anon_tournaments}")
elif percentage_diff <= 20:
match_data['confidence'] += 2
confidence_details.append(f"Tournaments within 20% range (+0.1, diff: {percentage_diff:.1f}%)")
match_data['match_reasons'].append(f"tournaments played: prev={prev_tournaments}, current={anon_tournaments}")
else:
confidence_details.append(f"Tournaments too different (diff: {percentage_diff:.1f}%)")
match_data['match_reasons'].append(f"tournaments played: prev={prev_tournaments}, current={anon_tournaments}")
else:
# Handle edge case where both values are 0
match_data['confidence'] += 4
confidence_details.append(f"Both have 0 tournaments (+0.2)")
match_data['match_reasons'].append(f"both have 0 tournaments played")
except ValueError:
confidence_details.append("Could not compare tournaments played")
# Points comparison
try:
anon_points = float(anon_player['points'])
prev_points = float(prev_player['points'])
points_diff = abs(anon_points - prev_points)
match_data['match_reasons'].append(f"points: prev={prev_points}, current={anon_points}, diff={points_diff}")
if points_diff == 0:
match_data['confidence'] += 4
confidence_details.append(f"Points unchanged (+0.3)")
else:
# Calculate percentage difference
max_points = max(anon_points, prev_points)
if max_points > 0:
percentage_diff = (points_diff / max_points) * 100
if percentage_diff <= 10:
match_data['confidence'] += 3
confidence_details.append(f"Points within 10% range (+0.25, diff: {percentage_diff:.1f}%)")
elif percentage_diff <= 20:
match_data['confidence'] += 2
confidence_details.append(f"Points within 20% range (+0.15, diff: {percentage_diff:.1f}%)")
elif percentage_diff <= 30:
match_data['confidence'] += 1
confidence_details.append(f"Points within 30% range (+0.1, diff: {percentage_diff:.1f}%)")
else:
confidence_details.append(f"Points too different (diff: {percentage_diff:.1f}%)")
except ValueError:
confidence_details.append("Could not compare points")
elif rank_diff <= 3:
match_data['rank_match_type'] = 'close'
match_data['match_reasons'].append(f"close previous rank match ({prev_rank_value} vs {prev_rank_from_progression})")
match_data['confidence'] = 4
elif rank_diff <= 10:
match_data['rank_match_type'] = 'approximate'
match_data['match_reasons'].append(f"approximate previous rank match ({prev_rank_value} vs {prev_rank_from_progression})")
match_data['confidence'] = 2
except ValueError:
pass
# Name match check
if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and
anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''):
if (anon_player['name'].lower() == prev_player['name'].lower() and
anon_player['first_name'].lower() == prev_player['first_name'].lower()):
match_data['confidence'] += 25
confidence_details.append(f"Exact name match (+0.4)")
match_data['match_reasons'].append("exact name match")
# Birth year match
if (anon_player['birth_year'] != 'N/A' and anon_player['birth_year'] != '' and
prev_player['birth_year'] != 'N/A' and prev_player['birth_year'] != '' and
anon_player['birth_year'] == prev_player['birth_year']):
match_data['confidence'] += 1
confidence_details.append(f"Birth year match (+0.2)")
match_data['match_reasons'].append(f"same birth year ({anon_player['birth_year']})")
# Only consider matches with reasonable confidence
if match_data['confidence'] >= 10:
# Print confidence calculation details
self.stdout.write("\n Confidence calculation:")
for detail in confidence_details:
self.stdout.write(f" {detail}")
self.stdout.write(f" Total confidence: {match_data['confidence']:.2f}")
match_data['match_reasons'] = ", ".join(match_data['match_reasons'])
potential_matches.append(match_data)
self.stdout.write(" → Considered as potential match")
# else:
# self.stdout.write(" → Rejected (confidence too low)")
# self.stdout.write("-"*40)
# Sort matches by confidence
potential_matches.sort(key=lambda x: x['confidence'], reverse=True)
# Summary of best matches
if potential_matches:
self.stdout.write("\nTop matches found:")
for i, match in enumerate(potential_matches[:3]): # Show top 3
self.stdout.write(f"\n{i+1}. {match['player']['name']} {match['player']['first_name']}")
self.stdout.write(f" Confidence: {match['confidence']:.2f}")
self.stdout.write(f" Reasons: {match['match_reasons']}")
else:
self.stdout.write("\nNo matches found with sufficient confidence.")
return potential_matches
def save_results(self, results, output_path):
"""Save matching results to a file"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
f.write("Anonymous Player Matching Results\n")
f.write("================================\n\n")
for match_info in results:
anon_player = match_info['anonymous_player']
best_match = match_info['best_match']
progression = f", Progression: {anon_player['progression']}" if anon_player['progression'] != 'N/A' else ""
assimilation = f", Assimilation: {anon_player['assimilation']}" if anon_player['assimilation'] != 'N/A' else ""
f.write(f"Anonymous Player (Rank: {anon_player['rank']}, League: {anon_player['league']}{progression}{assimilation})\n")
f.write(f"Best Match: {best_match['player']['name']} {best_match['player']['first_name']}\n")
f.write(f" Confidence: {best_match['confidence']:.2f}\n")
f.write(f" Match reasons: {best_match['match_reasons']}\n")
f.write(f" Previous Rank: {best_match['player']['rank']}\n")
f.write(f" League: {best_match['player']['league']}\n")
f.write(f" Club: {best_match['player']['club']}\n\n")
self.stdout.write(self.style.SUCCESS(f'Results saved to {output_path}'))
except Exception as e:
self.stderr.write(self.style.ERROR(f'Error saving results: {str(e)}'))
def update_rankings_with_matches(self, file_path, anonymous_players, matches, confidence_threshold, options):
"""
Update the rankings file with matched player information
Args:
file_path: Path to the current month's rankings file
anonymous_players: List of anonymous players (filtered based on command options)
matches: List of match info dictionaries
confidence_threshold: Minimum confidence to apply auto-matching
options: Command options
Returns:
Number of players that were updated
"""
self.stdout.write(self.style.SUCCESS(f"\nAuto-matching players with confidence ≥ {confidence_threshold}..."))
# Create a backup of the original file
backup_path = f"{file_path}.bak"
shutil.copy2(file_path, backup_path)
self.stdout.write(f"Created backup of original file at: {backup_path}")
# Read the original file
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Create a set of players that should be updated
# Only include players that were in our filtered anonymous_players list AND have high confidence matches
players_to_update = set()
update_info = {}
for match_info in matches:
anon_player = match_info['anonymous_player']
best_match = match_info['best_match']
# Only update if this player was in our filtered list AND meets confidence threshold
if anon_player in anonymous_players and best_match['confidence'] >= confidence_threshold:
# Create a unique identifier for this player
player_id = f"{anon_player['rank']}_{anon_player['points']}_{anon_player['assimilation']}_{anon_player['tournaments_played']}_{anon_player['league']}"
# Add additional uniqueness based on name status
if (anon_player['name'] != 'N/A' and anon_player['name'] != '' and
anon_player['first_name'] != 'N/A' and anon_player['first_name'] != ''):
player_id += f"_{anon_player['name']}_{anon_player['first_name']}"
players_to_update.add(player_id)
update_info[player_id] = {
'anonymous_player': anon_player,
'match': best_match
}
if not players_to_update:
self.stdout.write("No players met the confidence threshold for auto-matching.")
return 0
self.stdout.write(f"Found {len(players_to_update)} players to update.")
# Process the file line by line
updated_count = 0
updated_lines = []
# First, find the data start line
data_start_line = 0
for i, line in enumerate(lines):
if ';RANG;NOM;PRENOM;' in line:
data_start_line = i + 1
break
# Keep header lines unchanged
updated_lines.extend(lines[:data_start_line])
# Process data lines
for line in lines[data_start_line:]:
if not line.strip():
updated_lines.append(line)
continue
# Parse the line
values = line.strip().split(';')
if len(values) < 3:
updated_lines.append(line)
continue
# Extract player data from the line
rank = values[1].strip() if len(values) > 1 else ''
name = values[2].strip() if len(values) > 2 else ''
first_name = values[3].strip() if len(values) > 3 else ''
license_num = values[5].strip() if len(values) > 5 else ''
points = values[6].strip() if len(values) > 6 else ''
assimilation = values[7].strip() if len(values) > 7 else ''
tournaments = values[8].strip() if len(values) > 8 else ''
league = values[9].strip() if len(values) > 9 else ''
# Create player identifier for this line
line_player_id = f"{rank}_{points}_{assimilation}_{tournaments}_{league}"
# Add name info if present
if name and first_name and name != 'N/A' and first_name != 'N/A':
line_player_id += f"_{name}_{first_name}"
# Check if this player should be updated
if line_player_id in players_to_update:
# This player should be updated
match_info = update_info[line_player_id]
matched_player = match_info['match']['player']
# Update the line with matched player information
# Keep the existing rank and points, but update name and license
new_values = values.copy()
new_values[2] = matched_player['name'] # Name
new_values[3] = matched_player['first_name'] # First name
new_values[4] = matched_player['nationality']
new_values[5] = matched_player['license']
new_values[10] = matched_player['club_code']
new_values[11] = matched_player['club']
new_values[14] = matched_player['birth_year']
new_line = ';'.join(new_values) + '\n'
updated_lines.append(new_line)
updated_count += 1
self.stdout.write(f"Updated player: {matched_player['name']} {matched_player['first_name']} (Rank: {rank})")
else:
# This player should NOT be updated - keep the line exactly as is
updated_lines.append(line)
# Write the updated file
with open(file_path, 'w', encoding='utf-8') as f:
f.writelines(updated_lines)
self.stdout.write(self.style.SUCCESS(f"Successfully updated {updated_count} players in {file_path}"))
return updated_count
def iterative_match_anonymous_players(self, file_path, rankings_dir, options):
"""
Iteratively match anonymous players until no more matches can be found.
Uses temporary files to optimize processing speed.
"""
iteration = 1
total_matched = 0
changes_made = True
self.stdout.write(self.style.SUCCESS("\n=== Starting optimized iterative matching process ==="))
# Load initial data
current_players, current_metadata = self.parse_rankings_file(file_path)
# Count anonymous players at the start
anonymous_players = [p for p in current_players if self.is_anonymous_player(p)]
initial_anonymous_count = len(anonymous_players)
if initial_anonymous_count == 0:
self.stdout.write(self.style.SUCCESS("No anonymous players found. Process complete!"))
return
self.stdout.write(f"Initial anonymous players: {initial_anonymous_count}")
# Find previous month file
prev_month_file = self.find_previous_month_file(current_metadata, rankings_dir)
if not prev_month_file:
self.stderr.write(self.style.ERROR('Previous month rankings file not found!'))
return
self.stdout.write(f'Using previous month file: {os.path.basename(prev_month_file)}')
# Load previous month data
prev_players, prev_metadata = self.parse_rankings_file(prev_month_file)
# Create temp directory for our working files
with tempfile.TemporaryDirectory() as temp_dir:
self.stdout.write(f"Created temporary directory for working files: {temp_dir}")
# Generate initial temp files
anon_file = os.path.join(temp_dir, "anonymous_players.json")
prev_players_file = os.path.join(temp_dir, "prev_month_players.json")
matches_file = os.path.join(temp_dir, "matches.json")
print(os.path.join(temp_dir))
# Extract anonymous players and filter previous month players
self.stdout.write("Creating initial working files...")
filtered_data = self.create_filtered_working_files(
current_players, prev_players, anon_file, prev_players_file, options
)
anon_count = filtered_data['anon_count']
prev_count = filtered_data['prev_count']
self.stdout.write(f"Extracted {anon_count} anonymous players and {prev_count} eligible previous month players")
# Main iteration loop
while changes_made and anon_count > 0:
self.stdout.write(self.style.SUCCESS(f"\n--- Iteration {iteration} ---"))
self.stdout.write(f"Anonymous players remaining: {anon_count}")
self.stdout.write(f"Previous month players to check: {prev_count}")
# Process the current state of temp files
matched_count = self.match_players_from_temp_files(
anon_file, prev_players_file, matches_file,
file_path, current_metadata, options
)
# Check if changes were made
if matched_count > 0:
total_matched += matched_count
self.stdout.write(self.style.SUCCESS(
f"Iteration {iteration} complete: Matched {matched_count} players"
))
changes_made = True
# Update current players from the main file
current_players, _ = self.parse_rankings_file(file_path)
# Remove matched players from prev_players for next iteration
# Load the matches from the temp file to identify which prev players were used
if os.path.exists(matches_file):
with open(matches_file, 'r', encoding='utf-8') as f:
matches = json.load(f)
# Create a set of licenses that were matched
matched_licenses = set()
for match in matches:
matched_player = match['best_match']['player']
if matched_player['license'] != 'N/A' and matched_player['license']:
matched_licenses.add(matched_player['license'])
# Remove matched players from prev_players
prev_players = [p for p in prev_players if p['license'] not in matched_licenses]
# Update temp files for next iteration
filtered_data = self.create_filtered_working_files(current_players, prev_players, anon_file, prev_players_file, options)
self.stdout.write(self.style.SUCCESS(f"Iteration {iteration} complete: No new matches found"))
changes_made = False
# Increment iteration counter
iteration += 1
# Prevent infinite loops (optional safety check)
if iteration > 1: # Cap at 10 iterations maximum
self.stdout.write(self.style.WARNING("Maximum iterations reached (10). Stopping process."))
break
# Final summary
self.stdout.write(self.style.SUCCESS("\n=== Iterative matching process complete ==="))
self.stdout.write(f"Total iterations: {iteration - 1}")
self.stdout.write(f"Total players matched: {total_matched}")
# Final statistics
final_players, _ = self.parse_rankings_file(file_path)
final_anonymous_count = len([p for p in final_players if self.is_anonymous_player(p)])
self.stdout.write(f"Anonymous players remaining: {final_anonymous_count}")
# Calculate improvement percentage
if initial_anonymous_count > 0: # Avoid division by zero
improvement = ((initial_anonymous_count - final_anonymous_count) / initial_anonymous_count) * 100
self.stdout.write(f"Data completeness improved by {improvement:.1f}%")
def create_filtered_working_files(self, current_players, prev_players, anon_file, prev_players_file, options):
"""
Create filtered working files:
1. anonymous_players.json - Contains only anonymous players from current month
2. prev_month_players.json - Contains only players from previous month not in current month
Returns dictionary with counts of players in each file
"""
# Extract anonymous players from current month
all_anonymous_players = [p for p in current_players if self.is_anonymous_player(p)]
# Filter based on named/unnamed options
if options['named_only']:
anonymous_players = [p for p in all_anonymous_players if (
p['name'] != 'N/A' and p['name'] != '' and
p['first_name'] != 'N/A' and p['first_name'] != ''
)]
self.stdout.write(self.style.SUCCESS(f"Filtering to only process named anonymous players ({len(anonymous_players)}/{len(all_anonymous_players)})"))
elif options['unnamed_only']:
anonymous_players = [p for p in all_anonymous_players if (
p['name'] == 'N/A' or p['name'] == '' or
p['first_name'] == 'N/A' or p['first_name'] == ''
)]
self.stdout.write(self.style.SUCCESS(f"Filtering to only process unnamed anonymous players ({len(anonymous_players)}/{len(all_anonymous_players)})"))
else:
anonymous_players = all_anonymous_players
# Create lookup for current non-anonymous players
current_players_lookup = {}
for player in current_players:
if not self.is_anonymous_player(player):
# License lookup
if player['license'] != 'N/A' and player['license']:
current_players_lookup[f"license_{player['license']}"] = True
# Filter previous month players (only keep those not in current month)
filtered_prev_players = []
for player in prev_players:
if self.is_anonymous_player(player):
continue # Skip anonymous players from previous month
# Check if this player exists in current month
exists_in_current = False
# Check by license
if player['license'] != 'N/A' and player['license']:
if f"license_{player['license']}" in current_players_lookup:
exists_in_current = True
# Add to filtered list if not in current month
if not exists_in_current:
filtered_prev_players.append(player)
# Write anonymous players to file
with open(anon_file, 'w', encoding='utf-8') as f:
json.dump(anonymous_players, f, ensure_ascii=False)
# Write filtered previous players to file
with open(prev_players_file, 'w', encoding='utf-8') as f:
json.dump(filtered_prev_players, f, ensure_ascii=False)
return {
'anon_count': len(anonymous_players),
'prev_count': len(filtered_prev_players)
}
def match_players_from_temp_files(self, anon_file, prev_players_file, matches_file,
original_file, current_metadata, options):
"""
Match players between the anonymous and previous month temp files
and update the original file with matches
"""
# Load anonymous players
with open(anon_file, 'r', encoding='utf-8') as f:
anonymous_players = json.load(f)
# Load previous month players
with open(prev_players_file, 'r', encoding='utf-8') as f:
prev_players = json.load(f)
if not anonymous_players or not prev_players:
return 0
# Create indexes for efficient lookup
current_players_indexes = {
'license_index': {},
'name_index': {},
'name_club_league_index': {}
}
# Find matches
results = []
for anon_player in anonymous_players:
potential_matches = self.find_potential_matches(anon_player, prev_players, current_players_indexes, options)
if potential_matches:
if len(potential_matches) == 1:
best_match = potential_matches[0] # Highest confidence match
elif len(potential_matches) > 1 and potential_matches[0]['confidence'] - potential_matches[1]['confidence'] > 2:
# print(potential_matches[0]['confidence'], potential_matches[1]['match_reasons'])
best_match = potential_matches[0] # Highest confidence match
else:
# for match in potential_matches:
# print(match['player']['name'], match['confidence'], match['match_reasons'])
continue
# Record the match info
match_info = {
'anonymous_player': anon_player,
'potential_matches': potential_matches,
'best_match': best_match
}
results.append(match_info)
# Save matches to file
with open(matches_file, 'w', encoding='utf-8') as f:
# We can't directly serialize the complex match data, so extract key info
serializable_results = []
for match_info in results:
serializable_results.append({
'anonymous_player': match_info['anonymous_player'],
'best_match': {
'player': match_info['best_match']['player'],
'confidence': match_info['best_match']['confidence'],
'match_reasons': match_info['best_match']['match_reasons']
}
})
json.dump(serializable_results, f, ensure_ascii=False)
# Apply matches to the original file
if results:
matched_count = self.update_rankings_with_matches(
original_file, anonymous_players, results, options['confidence'], options
)
return matched_count
return 0