You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
194 lines
7.7 KiB
194 lines
7.7 KiB
from django.conf import settings
|
|
|
|
from typing import List, Dict
|
|
from .registry import model_registry
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class SyncModelChildrenManager:
|
|
"""
|
|
Manager class for handling model children sharing configuration.
|
|
Reads the SYNC_MODEL_CHILDREN_SHARING setting once and builds a bidirectional
|
|
relationship graph for efficient lookup.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the manager by reading the Django setting and building the relationship graph."""
|
|
self._model_relationships = getattr(
|
|
settings,
|
|
'SYNC_MODEL_CHILDREN_SHARING',
|
|
{}
|
|
)
|
|
self._relationship_graph = {}
|
|
|
|
def _build_relationship_graph(self) -> Dict[str, List[List[str]]]:
|
|
"""
|
|
Build a bidirectional relationship graph.
|
|
|
|
Returns:
|
|
Dict[str, List[List[str]]]: Dictionary where keys are model names and values
|
|
are lists of relationship paths (arrays of relationship names).
|
|
"""
|
|
graph = {}
|
|
|
|
# Add direct relationships (original models to their children)
|
|
for model_name, relationships in self._model_relationships.items():
|
|
if model_name not in graph:
|
|
graph[model_name] = []
|
|
graph[model_name].append(relationships)
|
|
|
|
# Build reverse relationships (children back to original models)
|
|
for original_model_name, relationships in self._model_relationships.items():
|
|
|
|
try:
|
|
current_model = model_registry.get_model(original_model_name)
|
|
if current_model is None:
|
|
print(f'missing {original_model_name}')
|
|
continue
|
|
|
|
current_reverse_path = []
|
|
|
|
for relationship_name in relationships:
|
|
# Get the related model through _meta
|
|
try:
|
|
field = None
|
|
# Try to find the field in the model's _meta
|
|
for f in current_model._meta.get_fields():
|
|
if hasattr(f, 'related_name') and f.related_name == relationship_name:
|
|
field = f
|
|
break
|
|
elif hasattr(f, 'name') and f.name == relationship_name:
|
|
field = f
|
|
break
|
|
|
|
if field is None:
|
|
continue
|
|
|
|
# Get the related model
|
|
if hasattr(field, 'related_model'):
|
|
related_model = field.related_model
|
|
elif hasattr(field, 'model'):
|
|
related_model = field.model
|
|
else:
|
|
continue
|
|
|
|
related_model_name = related_model.__name__
|
|
|
|
# Find the reverse relationship name
|
|
reverse_relationship_name = self._find_reverse_relationship(
|
|
relationship_name, current_model, related_model
|
|
)
|
|
|
|
if reverse_relationship_name:
|
|
current_reverse_path.append(reverse_relationship_name)
|
|
# Add the reverse path
|
|
if related_model_name not in graph:
|
|
graph[related_model_name] = []
|
|
|
|
# The path back is just the reverse relationship name
|
|
graph[related_model_name].append(current_reverse_path[::-1]) # make a reverse copy
|
|
|
|
current_model = related_model
|
|
|
|
except Exception as e:
|
|
logger.info(f'error 2 > {e}')
|
|
# Skip problematic relationships
|
|
continue
|
|
|
|
except Exception as e:
|
|
logger.info(f'error > {e}')
|
|
continue
|
|
|
|
return graph
|
|
|
|
def _find_reverse_relationship(self, original_relationship_name, in_model, for_model):
|
|
"""
|
|
Find the reverse relationship name from from_model to to_model.
|
|
|
|
Args:
|
|
from_model: The model to search relationships from
|
|
to_model: The target model to find relationship to
|
|
original_relationship_name: The original relationship name for context
|
|
|
|
Returns:
|
|
str or None: The reverse relationship name if found
|
|
"""
|
|
|
|
# print(f'reverse of {original_relationship_name} in = {in_model} for {for_model} ')
|
|
try:
|
|
for field in for_model._meta.get_fields():
|
|
# Check ForeignKey, OneToOneField fields
|
|
# print(f'{for_model} : field name = {field.name} / field.related_model = {field.related_model == in_model}')
|
|
if hasattr(field, 'related_model') and field.related_model == in_model:
|
|
return field.name
|
|
### possible improvements to do here if multiple relationships of the same type
|
|
|
|
|
|
# Check if this field has a related_name that matches our original relationship
|
|
# if hasattr(field, 'related_name') and field.related_name == original_relationship_name:
|
|
# # This is the reverse of our original relationship
|
|
# return field.name
|
|
# elif not hasattr(field, 'related_name') or field.related_name is None:
|
|
# default_name = f"{in_model._meta.related_query_name}"
|
|
# print(f'no related name: {default_name} / {original_relationship_name.rstrip('s')} ')
|
|
# if default_name == original_relationship_name.rstrip('s'): # Simple heuristic
|
|
# return field.name
|
|
|
|
# Check reverse relationships
|
|
if hasattr(field, 'field') and hasattr(field.field, 'model'):
|
|
if field.field.model == in_model:
|
|
if field.get_accessor_name() == original_relationship_name:
|
|
return field.field.name
|
|
|
|
except Exception as e:
|
|
print(f'!! ERROR = {e}')
|
|
pass
|
|
|
|
return None
|
|
|
|
def get_relationships(self, model_name: str) -> List[str]:
|
|
"""
|
|
Get the list of direct relationships for a given model name.
|
|
|
|
Args:
|
|
model_name (str): The name of the model to look up
|
|
|
|
Returns:
|
|
List[str]: List of relationship names for the model.
|
|
Returns empty list if model is not found.
|
|
"""
|
|
|
|
return self._model_relationships.get(model_name, [])
|
|
|
|
def get_relationship_paths(self, model_name: str) -> List[List[str]]:
|
|
"""
|
|
Get all relationship paths for a given model name.
|
|
This includes both direct relationships and reverse paths.
|
|
|
|
Args:
|
|
model_name (str): The name of the model to look up
|
|
|
|
Returns:
|
|
List[List[str]]: List of relationship paths (each path is a list of relationship names).
|
|
Returns empty list if model is not found.
|
|
"""
|
|
if not self._relationship_graph:
|
|
self._relationship_graph = self._build_relationship_graph()
|
|
# logger.info(f'self._relationship_graph = {self._relationship_graph}')
|
|
|
|
return self._relationship_graph.get(model_name, [])
|
|
|
|
def get_relationship_graph(self) -> Dict[str, List[List[str]]]:
|
|
"""
|
|
Get the complete relationship graph.
|
|
|
|
Returns:
|
|
Dict[str, List[List[str]]]: The complete relationship graph
|
|
"""
|
|
return self._relationship_graph.copy()
|
|
|
|
|
|
# Create a singleton instance to use throughout the application
|
|
sync_model_manager = SyncModelChildrenManager()
|
|
|