from django.conf import settings from typing import List, Dict from .registry import model_registry import logging logger = logging.getLogger(__name__) class SyncModelChildrenManager: """ Manager class for handling model children sharing configuration. Reads the SYNC_MODEL_CHILDREN_SHARING setting once and builds a bidirectional relationship graph for efficient lookup. """ def __init__(self): """Initialize the manager by reading the Django setting and building the relationship graph.""" self._model_relationships = getattr( settings, 'SYNC_MODEL_CHILDREN_SHARING', {} ) self._relationship_graph = {} def _build_relationship_graph(self) -> Dict[str, List[List[str]]]: """ Build a bidirectional relationship graph. Returns: Dict[str, List[List[str]]]: Dictionary where keys are model names and values are lists of relationship paths (arrays of relationship names). """ graph = {} # Add direct relationships (original models to their children) for model_name, relationships in self._model_relationships.items(): if model_name not in graph: graph[model_name] = [] graph[model_name].append(relationships) # Build reverse relationships (children back to original models) for original_model_name, relationships in self._model_relationships.items(): try: current_model = model_registry.get_model(original_model_name) if current_model is None: print(f'missing {original_model_name}') continue current_reverse_path = [] for relationship_name in relationships: # Get the related model through _meta try: field = None # Try to find the field in the model's _meta for f in current_model._meta.get_fields(): if hasattr(f, 'related_name') and f.related_name == relationship_name: field = f break elif hasattr(f, 'name') and f.name == relationship_name: field = f break if field is None: continue # Get the related model if hasattr(field, 'related_model'): related_model = field.related_model elif hasattr(field, 'model'): related_model = field.model else: continue related_model_name = related_model.__name__ # Find the reverse relationship name reverse_relationship_name = self._find_reverse_relationship( relationship_name, current_model, related_model ) if reverse_relationship_name: current_reverse_path.append(reverse_relationship_name) # Add the reverse path if related_model_name not in graph: graph[related_model_name] = [] # The path back is just the reverse relationship name graph[related_model_name].append(current_reverse_path[::-1]) # make a reverse copy current_model = related_model except Exception as e: logger.info(f'error 2 > {e}') # Skip problematic relationships continue except Exception as e: logger.info(f'error > {e}') continue return graph def _find_reverse_relationship(self, original_relationship_name, in_model, for_model): """ Find the reverse relationship name from from_model to to_model. Args: from_model: The model to search relationships from to_model: The target model to find relationship to original_relationship_name: The original relationship name for context Returns: str or None: The reverse relationship name if found """ # print(f'reverse of {original_relationship_name} in = {in_model} for {for_model} ') try: for field in for_model._meta.get_fields(): # Check ForeignKey, OneToOneField fields # print(f'{for_model} : field name = {field.name} / field.related_model = {field.related_model == in_model}') if hasattr(field, 'related_model') and field.related_model == in_model: return field.name ### possible improvements to do here if multiple relationships of the same type # Check if this field has a related_name that matches our original relationship # if hasattr(field, 'related_name') and field.related_name == original_relationship_name: # # This is the reverse of our original relationship # return field.name # elif not hasattr(field, 'related_name') or field.related_name is None: # default_name = f"{in_model._meta.related_query_name}" # print(f'no related name: {default_name} / {original_relationship_name.rstrip('s')} ') # if default_name == original_relationship_name.rstrip('s'): # Simple heuristic # return field.name # Check reverse relationships if hasattr(field, 'field') and hasattr(field.field, 'model'): if field.field.model == in_model: if field.get_accessor_name() == original_relationship_name: return field.field.name except Exception as e: print(f'!! ERROR = {e}') pass return None def get_relationships(self, model_name: str) -> List[str]: """ Get the list of direct relationships for a given model name. Args: model_name (str): The name of the model to look up Returns: List[str]: List of relationship names for the model. Returns empty list if model is not found. """ return self._model_relationships.get(model_name, []) def get_relationship_paths(self, model_name: str) -> List[List[str]]: """ Get all relationship paths for a given model name. This includes both direct relationships and reverse paths. Args: model_name (str): The name of the model to look up Returns: List[List[str]]: List of relationship paths (each path is a list of relationship names). Returns empty list if model is not found. """ if not self._relationship_graph: self._relationship_graph = self._build_relationship_graph() # logger.info(f'self._relationship_graph = {self._relationship_graph}') return self._relationship_graph.get(model_name, []) def get_relationship_graph(self) -> Dict[str, List[List[str]]]: """ Get the complete relationship graph. Returns: Dict[str, List[List[str]]]: The complete relationship graph """ return self._relationship_graph.copy() # Create a singleton instance to use throughout the application sync_model_manager = SyncModelChildrenManager()