from django.db import models from django.utils.timezone import now from django.conf import settings from typing import List, Set from ..model_manager import sync_model_manager import logging logger = logging.getLogger(__name__) class BaseModel(models.Model): creation_date = models.DateTimeField(default=now, editable=False) last_update = models.DateTimeField(default=now) related_user = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True, on_delete=models.SET_NULL, related_name='+') last_updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True, on_delete=models.SET_NULL, related_name='+') data_access_ids = models.JSONField(default=list) sharable = True class Meta: abstract = True def save(self, *args, **kwargs): if self.related_user is None: self.related_user = self.find_related_user() if self._state.adding: self.update_data_access_list() super().save(*args, **kwargs) def get_store_id(self): if isinstance(self, SideStoreModel): return self.store_id else: return None def data_identifier_dict(self): return { 'model_id': self.id, 'store_id': None } def update_data_access_list(self): if self.sharable == False: return related_instances = self.sharing_related_instances() data_access_ids = set() for instance in related_instances: if isinstance(instance, BaseModel) and instance.data_access_ids: data_access_ids.update(instance.data_access_ids) # print(f'related_instances = {related_instances}') self.data_access_ids = list(data_access_ids) def add_data_access_relation(self, data_access): if self.sharable == False: return str_id = str(data_access.id) if str_id not in self.data_access_ids: self.data_access_ids.append(str_id) def remove_data_access_relation(self, data_access): if self.sharable == False: return try: self.data_access_ids.remove(str(data_access.id)) except ValueError: pass def get_children_by_model(self): """ Returns a dictionary where: - keys are the model names - values are QuerySets of related objects """ children = self._meta.get_fields(include_parents=False, include_hidden=False) related_objects = {} for child in children: if (child.one_to_many or child.one_to_one) and child.auto_created: model_name = child.related_model.__name__ related_objects[model_name] = getattr(self, child.name).all() return related_objects def get_parents_by_model(self): """ Returns a dictionary where: - keys are the model names - values are the parent model instances """ parents = {} # Get all fields including parent links fields = self._meta.get_fields(include_parents=True, include_hidden=True) for field in fields: # Check if the field is a parent link (OneToOne field that points to a parent model) if isinstance(field, models.OneToOneRel) and field.parent_link: model_name = field.related_model.__name__ # Get the parent instance using the related name parent_instance = getattr(self, field.get_accessor_name()) if parent_instance: # print(f'>>> add parent for OneToOneRel : {model_name}') parents[model_name] = parent_instance # Also check for direct foreign key relationships that might represent parent relationships elif isinstance(field, models.ForeignKey): model_name = field.related_model.__name__ parent_instance = getattr(self, field.name) if parent_instance: # print(f'>>> add parent for ForeignKey : {model_name}') parents[model_name] = parent_instance return parents def get_recursive_children(self, processed_objects: Set = None) -> List: """ Recursively get all children objects through the hierarchy """ if processed_objects is None: processed_objects = set() # Skip if we've already processed this object to avoid infinite recursion if self.pk in processed_objects: return [] processed_objects.add(self.pk) children = [] # Get immediate children children_by_model = self.get_children_by_model() for queryset in children_by_model.values(): for child in queryset: if isinstance(child, BaseModel): children.append(child) children.extend(child.get_recursive_children(processed_objects)) return children def get_recursive_parents(self, processed_objects: Set = None) -> List: """ Recursively get all parent objects through the hierarchy """ if processed_objects is None: processed_objects = set() # Skip if we've already processed this object to avoid infinite recursion if self.pk in processed_objects: return [] processed_objects.add(self.pk) parents = [] # Get immediate parents parents_by_model = self.get_parents_by_model() for parent in parents_by_model.values(): parents.append(parent) # Recursively get parents of parents if isinstance(parent, BaseModel): parents.extend(parent.get_recursive_parents(processed_objects)) return parents def related_instances(self): """ Get all related instances (both children and parents) recursively """ instances = [] processed_objects = set() instances.extend(self.get_recursive_children(processed_objects)) processed_objects = set() instances.extend(self.get_recursive_parents(processed_objects)) return instances def find_related_user(self, processed_objects: Set = None): if processed_objects is None: processed_objects = set() # Skip if we've already processed this object to avoid infinite recursion if self.pk in processed_objects: return None processed_objects.add(self.pk) # Get immediate parents parents_by_model = self.get_parents_by_model() for parent in parents_by_model.values(): if isinstance(parent, BaseModel): if parent.related_user: print(f'related_user found in {parent}') return parent.related_user else: return parent.find_related_user(processed_objects) return None def sharing_related_instances(self): """ Get all related instances (both children and parents) recursively """ instances = [] processed_objects = set() instances.extend(self.get_shared_children(processed_objects)) processed_objects = set() instances.extend(self.get_recursive_parents(processed_objects)) return instances def get_shared_children(self, processed_objects): relationships_arrays = sync_model_manager.get_relationship_paths(self.__class__.__name__) instances = [] if relationships_arrays: for relationships in relationships_arrays: children = self.get_shared_children_from_relationships(relationships, processed_objects) instances.extend(children) else: children = self.get_recursive_children(processed_objects) instances.extend(children) return instances def get_shared_children_from_relationships(self, relationships, processed_objects): print(f'>>> {self.__class__.__name__} : relationships = {relationships}') current = [self] for relationship in relationships: # print(f'> relationship = {relationship}') values = [] for item in current: value = getattr(item, relationship) if hasattr(value, 'all') and callable(value.all): # This is a queryset from a reverse relationship for related_obj in value.all(): processed_objects.add(related_obj) values.extend(value.all()) else: processed_objects.add(value) values.append(value) current = values # logger.info(f'+++ shared children = {processed_objects}') return processed_objects class SideStoreModel(BaseModel): store_id = models.CharField(max_length=100, default="") # a value matching LeStorage directory sub-stores. Matches the name of the directory. class Meta: abstract = True def data_identifier_dict(self): return { 'model_id': self.id, 'store_id': self.store_id }