parent
90e7f4216e
commit
e8d92d1216
@ -0,0 +1,194 @@ |
||||
from django.conf import settings |
||||
|
||||
from typing import List, Dict |
||||
from .registry import model_registry |
||||
|
||||
import logging |
||||
|
||||
logger = logging.getLogger(__name__) |
||||
|
||||
class SyncModelChildrenManager: |
||||
""" |
||||
Manager class for handling model children sharing configuration. |
||||
Reads the SYNC_MODEL_CHILDREN_SHARING setting once and builds a bidirectional |
||||
relationship graph for efficient lookup. |
||||
""" |
||||
|
||||
def __init__(self): |
||||
"""Initialize the manager by reading the Django setting and building the relationship graph.""" |
||||
self._model_relationships = getattr( |
||||
settings, |
||||
'SYNC_MODEL_CHILDREN_SHARING', |
||||
{} |
||||
) |
||||
self._relationship_graph = {} |
||||
|
||||
def _build_relationship_graph(self) -> Dict[str, List[List[str]]]: |
||||
""" |
||||
Build a bidirectional relationship graph. |
||||
|
||||
Returns: |
||||
Dict[str, List[List[str]]]: Dictionary where keys are model names and values |
||||
are lists of relationship paths (arrays of relationship names). |
||||
""" |
||||
graph = {} |
||||
|
||||
# Add direct relationships (original models to their children) |
||||
for model_name, relationships in self._model_relationships.items(): |
||||
if model_name not in graph: |
||||
graph[model_name] = [] |
||||
graph[model_name].append(relationships) |
||||
|
||||
# Build reverse relationships (children back to original models) |
||||
for original_model_name, relationships in self._model_relationships.items(): |
||||
|
||||
try: |
||||
current_model = model_registry.get_model(original_model_name) |
||||
if current_model is None: |
||||
print(f'missing {original_model_name}') |
||||
continue |
||||
|
||||
current_reverse_path = [] |
||||
|
||||
for relationship_name in relationships: |
||||
# Get the related model through _meta |
||||
try: |
||||
field = None |
||||
# Try to find the field in the model's _meta |
||||
for f in current_model._meta.get_fields(): |
||||
if hasattr(f, 'related_name') and f.related_name == relationship_name: |
||||
field = f |
||||
break |
||||
elif hasattr(f, 'name') and f.name == relationship_name: |
||||
field = f |
||||
break |
||||
|
||||
if field is None: |
||||
continue |
||||
|
||||
# Get the related model |
||||
if hasattr(field, 'related_model'): |
||||
related_model = field.related_model |
||||
elif hasattr(field, 'model'): |
||||
related_model = field.model |
||||
else: |
||||
continue |
||||
|
||||
related_model_name = related_model.__name__ |
||||
|
||||
# Find the reverse relationship name |
||||
reverse_relationship_name = self._find_reverse_relationship( |
||||
relationship_name, current_model, related_model |
||||
) |
||||
|
||||
if reverse_relationship_name: |
||||
current_reverse_path.append(reverse_relationship_name) |
||||
# Add the reverse path |
||||
if related_model_name not in graph: |
||||
graph[related_model_name] = [] |
||||
|
||||
# The path back is just the reverse relationship name |
||||
graph[related_model_name] = current_reverse_path[::-1] # make a reverse copy |
||||
|
||||
current_model = related_model |
||||
|
||||
except Exception as e: |
||||
logger.info(f'error 2 > {e}') |
||||
# Skip problematic relationships |
||||
continue |
||||
|
||||
except Exception as e: |
||||
logger.info(f'error > {e}') |
||||
continue |
||||
|
||||
return graph |
||||
|
||||
def _find_reverse_relationship(self, original_relationship_name, in_model, for_model): |
||||
""" |
||||
Find the reverse relationship name from from_model to to_model. |
||||
|
||||
Args: |
||||
from_model: The model to search relationships from |
||||
to_model: The target model to find relationship to |
||||
original_relationship_name: The original relationship name for context |
||||
|
||||
Returns: |
||||
str or None: The reverse relationship name if found |
||||
""" |
||||
|
||||
print(f'reverse of {original_relationship_name} in = {in_model} for {for_model} ') |
||||
try: |
||||
for field in for_model._meta.get_fields(): |
||||
# Check ForeignKey, OneToOneField fields |
||||
# print(f'{for_model} : field name = {field.name} / field.related_model = {field.related_model == in_model}') |
||||
if hasattr(field, 'related_model') and field.related_model == in_model: |
||||
return field.name |
||||
### possible improvements to do here if multiple relationships of the same type |
||||
|
||||
|
||||
# Check if this field has a related_name that matches our original relationship |
||||
# if hasattr(field, 'related_name') and field.related_name == original_relationship_name: |
||||
# # This is the reverse of our original relationship |
||||
# return field.name |
||||
# elif not hasattr(field, 'related_name') or field.related_name is None: |
||||
# default_name = f"{in_model._meta.related_query_name}" |
||||
# print(f'no related name: {default_name} / {original_relationship_name.rstrip('s')} ') |
||||
# if default_name == original_relationship_name.rstrip('s'): # Simple heuristic |
||||
# return field.name |
||||
|
||||
# Check reverse relationships |
||||
if hasattr(field, 'field') and hasattr(field.field, 'model'): |
||||
if field.field.model == in_model: |
||||
if field.get_accessor_name() == original_relationship_name: |
||||
return field.field.name |
||||
|
||||
except Exception as e: |
||||
print(f'!! ERROR = {e}') |
||||
pass |
||||
|
||||
return None |
||||
|
||||
def get_relationships(self, model_name: str) -> List[str]: |
||||
""" |
||||
Get the list of direct relationships for a given model name. |
||||
|
||||
Args: |
||||
model_name (str): The name of the model to look up |
||||
|
||||
Returns: |
||||
List[str]: List of relationship names for the model. |
||||
Returns empty list if model is not found. |
||||
""" |
||||
|
||||
return self._model_relationships.get(model_name, []) |
||||
|
||||
def get_relationship_paths(self, model_name: str) -> List[List[str]]: |
||||
""" |
||||
Get all relationship paths for a given model name. |
||||
This includes both direct relationships and reverse paths. |
||||
|
||||
Args: |
||||
model_name (str): The name of the model to look up |
||||
|
||||
Returns: |
||||
List[List[str]]: List of relationship paths (each path is a list of relationship names). |
||||
Returns empty list if model is not found. |
||||
""" |
||||
if not self._relationship_graph: |
||||
self._relationship_graph = self._build_relationship_graph() |
||||
logger.info(f'self._relationship_graph = {self._relationship_graph}') |
||||
|
||||
return self._relationship_graph.get(model_name, []) |
||||
|
||||
def get_relationship_graph(self) -> Dict[str, List[List[str]]]: |
||||
""" |
||||
Get the complete relationship graph. |
||||
|
||||
Returns: |
||||
Dict[str, List[List[str]]]: The complete relationship graph |
||||
""" |
||||
return self._relationship_graph.copy() |
||||
|
||||
|
||||
# Create a singleton instance to use throughout the application |
||||
sync_model_manager = SyncModelChildrenManager() |
||||
Loading…
Reference in new issue