You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
260 lines
9.1 KiB
260 lines
9.1 KiB
from django.db import models
|
|
from django.utils.timezone import now
|
|
from django.conf import settings
|
|
|
|
from typing import List, Set
|
|
from ..model_manager import sync_model_manager
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BaseModel(models.Model):
|
|
creation_date = models.DateTimeField(default=now, editable=False)
|
|
last_update = models.DateTimeField(default=now, auto_now=True)
|
|
related_user = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True, on_delete=models.SET_NULL, related_name='+')
|
|
last_updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True, on_delete=models.SET_NULL, related_name='+')
|
|
data_access_ids = models.JSONField(default=list)
|
|
|
|
sharable = True
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def save(self, *args, **kwargs):
|
|
if self.related_user is None:
|
|
self.related_user = self.find_related_user()
|
|
if self._state.adding:
|
|
self.update_data_access_list()
|
|
super().save(*args, **kwargs)
|
|
|
|
def get_store_id(self):
|
|
if isinstance(self, SideStoreModel):
|
|
return self.store_id
|
|
else:
|
|
return None
|
|
|
|
def data_identifier_dict(self):
|
|
return {
|
|
'model_id': self.id,
|
|
'store_id': None
|
|
}
|
|
|
|
def update_data_access_list(self):
|
|
if self.sharable == False:
|
|
return
|
|
related_instances = self.sharing_related_instances()
|
|
data_access_ids = set()
|
|
for instance in related_instances:
|
|
if isinstance(instance, BaseModel) and instance.data_access_ids:
|
|
data_access_ids.update(instance.data_access_ids)
|
|
|
|
# print(f'related_instances = {related_instances}')
|
|
self.data_access_ids = list(data_access_ids)
|
|
|
|
def add_data_access_relation(self, data_access):
|
|
if self.sharable == False:
|
|
return
|
|
str_id = str(data_access.id)
|
|
if str_id not in self.data_access_ids:
|
|
self.data_access_ids.append(str_id)
|
|
|
|
def remove_data_access_relation(self, data_access):
|
|
if self.sharable == False:
|
|
return
|
|
try:
|
|
self.data_access_ids.remove(str(data_access.id))
|
|
except ValueError:
|
|
pass
|
|
|
|
def get_children_by_model(self):
|
|
"""
|
|
Returns a dictionary where:
|
|
- keys are the model names
|
|
- values are QuerySets of related objects
|
|
"""
|
|
children = self._meta.get_fields(include_parents=False, include_hidden=False)
|
|
related_objects = {}
|
|
|
|
for child in children:
|
|
if (child.one_to_many or child.one_to_one) and child.auto_created:
|
|
model_name = child.related_model.__name__
|
|
related_objects[model_name] = getattr(self, child.name).all()
|
|
|
|
return related_objects
|
|
|
|
def get_parents_by_model(self):
|
|
"""
|
|
Returns a dictionary where:
|
|
- keys are the model names
|
|
- values are the parent model instances
|
|
"""
|
|
parents = {}
|
|
|
|
# Get all fields including parent links
|
|
fields = self._meta.get_fields(include_parents=True, include_hidden=True)
|
|
|
|
for field in fields:
|
|
# Check if the field is a parent link (OneToOne field that points to a parent model)
|
|
if isinstance(field, models.OneToOneRel) and field.parent_link:
|
|
model_name = field.related_model.__name__
|
|
# Get the parent instance using the related name
|
|
parent_instance = getattr(self, field.get_accessor_name())
|
|
if parent_instance:
|
|
# print(f'>>> add parent for OneToOneRel : {model_name}')
|
|
parents[model_name] = parent_instance
|
|
|
|
# Also check for direct foreign key relationships that might represent parent relationships
|
|
elif isinstance(field, models.ForeignKey):
|
|
model_name = field.related_model.__name__
|
|
parent_instance = getattr(self, field.name)
|
|
if parent_instance:
|
|
# print(f'>>> add parent for ForeignKey : {model_name}')
|
|
parents[model_name] = parent_instance
|
|
|
|
return parents
|
|
|
|
def get_recursive_children(self, processed_objects: Set = None) -> List:
|
|
"""
|
|
Recursively get all children objects through the hierarchy
|
|
"""
|
|
if processed_objects is None:
|
|
processed_objects = set()
|
|
|
|
# Skip if we've already processed this object to avoid infinite recursion
|
|
if self.pk in processed_objects:
|
|
return []
|
|
|
|
processed_objects.add(self.pk)
|
|
children = []
|
|
|
|
# Get immediate children
|
|
children_by_model = self.get_children_by_model()
|
|
for queryset in children_by_model.values():
|
|
for child in queryset:
|
|
if isinstance(child, BaseModel):
|
|
children.append(child)
|
|
children.extend(child.get_recursive_children(processed_objects))
|
|
|
|
return children
|
|
|
|
def get_recursive_parents(self, processed_objects: Set = None) -> List:
|
|
"""
|
|
Recursively get all parent objects through the hierarchy
|
|
"""
|
|
if processed_objects is None:
|
|
processed_objects = set()
|
|
|
|
# Skip if we've already processed this object to avoid infinite recursion
|
|
if self.pk in processed_objects:
|
|
return []
|
|
|
|
processed_objects.add(self.pk)
|
|
parents = []
|
|
|
|
# Get immediate parents
|
|
parents_by_model = self.get_parents_by_model()
|
|
for parent in parents_by_model.values():
|
|
parents.append(parent)
|
|
# Recursively get parents of parents
|
|
if isinstance(parent, BaseModel):
|
|
parents.extend(parent.get_recursive_parents(processed_objects))
|
|
|
|
return parents
|
|
|
|
def related_instances(self):
|
|
"""
|
|
Get all related instances (both children and parents) recursively
|
|
"""
|
|
instances = []
|
|
processed_objects = set()
|
|
instances.extend(self.get_recursive_children(processed_objects))
|
|
|
|
processed_objects = set()
|
|
instances.extend(self.get_recursive_parents(processed_objects))
|
|
|
|
return instances
|
|
|
|
def find_related_user(self, processed_objects: Set = None):
|
|
if processed_objects is None:
|
|
processed_objects = set()
|
|
|
|
# Skip if we've already processed this object to avoid infinite recursion
|
|
if self.pk in processed_objects:
|
|
return None
|
|
|
|
processed_objects.add(self.pk)
|
|
|
|
# Get immediate parents
|
|
parents_by_model = self.get_parents_by_model()
|
|
for parent in parents_by_model.values():
|
|
if isinstance(parent, BaseModel):
|
|
if parent.related_user:
|
|
print(f'*** related_user found in {parent}')
|
|
return parent.related_user
|
|
else:
|
|
return parent.find_related_user(processed_objects)
|
|
|
|
return None
|
|
|
|
def sharing_related_instances(self):
|
|
"""
|
|
Get all related instances (both children and parents) recursively
|
|
"""
|
|
instances = []
|
|
processed_objects = set()
|
|
instances.extend(self.get_shared_children(processed_objects))
|
|
|
|
processed_objects = set()
|
|
instances.extend(self.get_recursive_parents(processed_objects))
|
|
|
|
return instances
|
|
|
|
def get_shared_children(self, processed_objects):
|
|
relationships_arrays = sync_model_manager.get_relationship_paths(self.__class__.__name__)
|
|
instances = []
|
|
|
|
if relationships_arrays:
|
|
for relationships in relationships_arrays:
|
|
children = self.get_shared_children_from_relationships(relationships, processed_objects)
|
|
instances.extend(children)
|
|
else:
|
|
children = self.get_recursive_children(processed_objects)
|
|
instances.extend(children)
|
|
|
|
return instances
|
|
|
|
def get_shared_children_from_relationships(self, relationships, processed_objects):
|
|
|
|
# print(f'>>> {self.__class__.__name__} : relationships = {relationships}')
|
|
current = [self]
|
|
for relationship in relationships:
|
|
# print(f'> relationship = {relationship}')
|
|
values = []
|
|
for item in current:
|
|
value = getattr(item, relationship)
|
|
|
|
if hasattr(value, 'all') and callable(value.all):
|
|
# This is a queryset from a reverse relationship
|
|
for related_obj in value.all():
|
|
processed_objects.add(related_obj)
|
|
values.extend(value.all())
|
|
else:
|
|
processed_objects.add(value)
|
|
values.append(value)
|
|
current = values
|
|
|
|
# logger.info(f'+++ shared children = {processed_objects}')
|
|
return processed_objects
|
|
|
|
class SideStoreModel(BaseModel):
|
|
store_id = models.CharField(max_length=100, default="") # a value matching LeStorage directory sub-stores. Matches the name of the directory.
|
|
|
|
class Meta:
|
|
abstract = True
|
|
|
|
def data_identifier_dict(self):
|
|
return {
|
|
'model_id': self.id,
|
|
'store_id': self.store_id
|
|
}
|
|
|