You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/sync/model_manager.py

194 lines
7.7 KiB

from django.conf import settings
from typing import List, Dict
from .registry import model_registry
import logging
logger = logging.getLogger(__name__)
class SyncModelChildrenManager:
"""
Manager class for handling model children sharing configuration.
Reads the SYNC_MODEL_CHILDREN_SHARING setting once and builds a bidirectional
relationship graph for efficient lookup.
"""
def __init__(self):
"""Initialize the manager by reading the Django setting and building the relationship graph."""
self._model_relationships = getattr(
settings,
'SYNC_MODEL_CHILDREN_SHARING',
{}
)
self._relationship_graph = {}
def _build_relationship_graph(self) -> Dict[str, List[List[str]]]:
"""
Build a bidirectional relationship graph.
Returns:
Dict[str, List[List[str]]]: Dictionary where keys are model names and values
are lists of relationship paths (arrays of relationship names).
"""
graph = {}
# Add direct relationships (original models to their children)
for model_name, relationships in self._model_relationships.items():
if model_name not in graph:
graph[model_name] = []
graph[model_name].append(relationships)
# Build reverse relationships (children back to original models)
for original_model_name, relationships in self._model_relationships.items():
try:
current_model = model_registry.get_model(original_model_name)
if current_model is None:
print(f'missing {original_model_name}')
continue
current_reverse_path = []
for relationship_name in relationships:
# Get the related model through _meta
try:
field = None
# Try to find the field in the model's _meta
for f in current_model._meta.get_fields():
if hasattr(f, 'related_name') and f.related_name == relationship_name:
field = f
break
elif hasattr(f, 'name') and f.name == relationship_name:
field = f
break
if field is None:
continue
# Get the related model
if hasattr(field, 'related_model'):
related_model = field.related_model
elif hasattr(field, 'model'):
related_model = field.model
else:
continue
related_model_name = related_model.__name__
# Find the reverse relationship name
reverse_relationship_name = self._find_reverse_relationship(
relationship_name, current_model, related_model
)
if reverse_relationship_name:
current_reverse_path.append(reverse_relationship_name)
# Add the reverse path
if related_model_name not in graph:
graph[related_model_name] = []
# The path back is just the reverse relationship name
graph[related_model_name].append(current_reverse_path[::-1]) # make a reverse copy
current_model = related_model
except Exception as e:
logger.info(f'error 2 > {e}')
# Skip problematic relationships
continue
except Exception as e:
logger.info(f'error > {e}')
continue
return graph
def _find_reverse_relationship(self, original_relationship_name, in_model, for_model):
"""
Find the reverse relationship name from from_model to to_model.
Args:
from_model: The model to search relationships from
to_model: The target model to find relationship to
original_relationship_name: The original relationship name for context
Returns:
str or None: The reverse relationship name if found
"""
# print(f'reverse of {original_relationship_name} in = {in_model} for {for_model} ')
try:
for field in for_model._meta.get_fields():
# Check ForeignKey, OneToOneField fields
# print(f'{for_model} : field name = {field.name} / field.related_model = {field.related_model == in_model}')
if hasattr(field, 'related_model') and field.related_model == in_model:
return field.name
### possible improvements to do here if multiple relationships of the same type
# Check if this field has a related_name that matches our original relationship
# if hasattr(field, 'related_name') and field.related_name == original_relationship_name:
# # This is the reverse of our original relationship
# return field.name
# elif not hasattr(field, 'related_name') or field.related_name is None:
# default_name = f"{in_model._meta.related_query_name}"
# print(f'no related name: {default_name} / {original_relationship_name.rstrip('s')} ')
# if default_name == original_relationship_name.rstrip('s'): # Simple heuristic
# return field.name
# Check reverse relationships
if hasattr(field, 'field') and hasattr(field.field, 'model'):
if field.field.model == in_model:
if field.get_accessor_name() == original_relationship_name:
return field.field.name
except Exception as e:
print(f'!! ERROR = {e}')
pass
return None
def get_relationships(self, model_name: str) -> List[str]:
"""
Get the list of direct relationships for a given model name.
Args:
model_name (str): The name of the model to look up
Returns:
List[str]: List of relationship names for the model.
Returns empty list if model is not found.
"""
return self._model_relationships.get(model_name, [])
def get_relationship_paths(self, model_name: str) -> List[List[str]]:
"""
Get all relationship paths for a given model name.
This includes both direct relationships and reverse paths.
Args:
model_name (str): The name of the model to look up
Returns:
List[List[str]]: List of relationship paths (each path is a list of relationship names).
Returns empty list if model is not found.
"""
if not self._relationship_graph:
self._relationship_graph = self._build_relationship_graph()
# logger.info(f'self._relationship_graph = {self._relationship_graph}')
return self._relationship_graph.get(model_name, [])
def get_relationship_graph(self) -> Dict[str, List[List[str]]]:
"""
Get the complete relationship graph.
Returns:
Dict[str, List[List[str]]]: The complete relationship graph
"""
return self._relationship_graph.copy()
# Create a singleton instance to use throughout the application
sync_model_manager = SyncModelChildrenManager()