You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
403 lines
16 KiB
403 lines
16 KiB
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed
|
|
from django.db import models, transaction
|
|
from django.dispatch import receiver
|
|
|
|
from .models import DataAccess, ModelLog, ModelOperation, BaseModel, SideStoreModel
|
|
from authentication.models import Device
|
|
|
|
from django.contrib.auth import get_user_model
|
|
|
|
from .ws_sender import websocket_sender
|
|
from .registry import device_registry, related_users_registry, model_registry
|
|
|
|
import logging
|
|
import traceback
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
User = get_user_model()
|
|
|
|
### Sync
|
|
|
|
@receiver([pre_save, pre_delete])
|
|
def presave_handler(sender, instance, **kwargs):
|
|
|
|
try:
|
|
# some other classes are excluded in settings_app.py: SYNC_APPS
|
|
if not isinstance(instance, (BaseModel, User)):
|
|
return
|
|
|
|
signal = kwargs.get('signal')
|
|
# avoid crash in manage.py createsuperuser + delete user in the admin
|
|
if isinstance(instance, User) and (instance._state.db is None or signal == pre_delete):
|
|
return
|
|
|
|
users = related_users(instance)
|
|
related_users_registry.register(instance.id, users)
|
|
|
|
if signal == pre_save:
|
|
detect_foreign_key_changes_for_shared_instances(sender, instance)
|
|
|
|
except Exception as e:
|
|
logger.info(f'*** presave_handler ERROR: {e}')
|
|
raise
|
|
|
|
@receiver([post_save, post_delete])
|
|
def synchronization_notifications(sender, instance, created=False, **kwargs):
|
|
"""
|
|
Signal handler that sends notifications through WebSocket channels when model instances are saved or deleted.
|
|
The function creates a WebSocket group name for each affected user and sends a sync update message
|
|
to all clients connected to that group.
|
|
"""
|
|
|
|
# some classes are excluded in settings_app.py: SYNC_APPS
|
|
if not isinstance(instance, BaseModel) and not isinstance(instance, User):
|
|
return
|
|
|
|
model_name = instance.__class__.__name__
|
|
if model_registry.get_model(model_name) is None:
|
|
return
|
|
|
|
try:
|
|
process_foreign_key_changes(sender, instance, **kwargs)
|
|
signal = kwargs.get('signal')
|
|
save_model_log_if_possible(instance, signal, created)
|
|
notify_impacted_users(instance)
|
|
|
|
# print(f'!!!!! related_users_registry.unregister for {instance.__class__.__name__} / {signal}')
|
|
related_users_registry.unregister(instance.id)
|
|
except Exception as e:
|
|
logger.info(f'*** ERROR2: {e}')
|
|
logger.info(traceback.format_exc())
|
|
raise
|
|
|
|
def notify_impacted_users(instance):
|
|
device_id = device_registry.get_device_id(instance.id)
|
|
users = related_users_registry.get_users(instance.id)
|
|
logger.info(f'>>> notify_impacted_users: {users} for {instance.id}')
|
|
|
|
if users:
|
|
user_ids = [user.id for user in users]
|
|
websocket_sender.send_message(user_ids, device_id)
|
|
|
|
device_registry.unregister(instance.id)
|
|
|
|
def save_model_log_if_possible(instance, signal, created):
|
|
|
|
users = related_users_registry.get_users(instance.id)
|
|
# logger.info(f'*** save_model_log_if_possible >>> users from registry = {users}, instance = {instance}')
|
|
|
|
if not users:
|
|
logger.warning(f'!!! Registry returned empty users for instance {instance.id} ({instance.__class__.__name__})')
|
|
# Try to recalculate users as fallback
|
|
users = related_users(instance)
|
|
logger.info(f'!!! Recalculated users for fallback: {users}')
|
|
|
|
if users:
|
|
if signal == post_save or signal == pre_save:
|
|
if created:
|
|
operation = ModelOperation.POST
|
|
else:
|
|
operation = ModelOperation.PUT
|
|
else:
|
|
operation = ModelOperation.DELETE
|
|
|
|
store_id = None
|
|
if isinstance(instance, SideStoreModel):
|
|
store_id = instance.store_id
|
|
|
|
# if operation == ModelOperation.DELETE: # delete now unnecessary logs
|
|
# ModelLog.objects.filter(model_id=instance.id).delete()
|
|
|
|
# user_ids = [user.id for user in users]
|
|
# # print(f'users to notify: {user_ids}')
|
|
# instance._users_to_notify = user_ids # save this for the post_save signal
|
|
model_name = instance.__class__.__name__
|
|
save_model_log(users, operation, model_name, instance.id, store_id)
|
|
|
|
else:
|
|
logger.info(f'!!! Model Log could not be created because no linked user could be found: {instance.__class__.__name__} {instance}, {signal}')
|
|
|
|
def save_model_log(users, model_operation, model_name, model_id, store_id):
|
|
device_id = device_registry.get_device_id(model_id)
|
|
# logger.info(f'*** creating ModelLogs for: {model_operation} {model_name} : {users}')
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
created_logs = []
|
|
for user in users:
|
|
if user.can_synchronize:
|
|
# logger.info(f'Creating ModelLog for user {user.id} - user exists: {User.objects.filter(id=user.id).exists()}')
|
|
model_log = ModelLog(
|
|
user=user,
|
|
operation=model_operation,
|
|
model_name=model_name,
|
|
model_id=model_id,
|
|
store_id=store_id,
|
|
device_id=device_id
|
|
)
|
|
model_log.save()
|
|
# logger.info(f'ModelLog saved with ID: {model_log.id}')
|
|
created_logs.append(model_log.id)
|
|
|
|
# Immediate verification within transaction
|
|
# immediate_count = ModelLog.objects.filter(id__in=created_logs).count()
|
|
# logger.info(f'*** Within transaction: Created {len(created_logs)}, found {immediate_count}')
|
|
|
|
# Verification after transaction commits
|
|
# persisted_count = ModelLog.objects.filter(id__in=created_logs).count()
|
|
# logger.info(f'*** After transaction: Created {len(created_logs)}, persisted {persisted_count}')
|
|
|
|
except Exception as e:
|
|
logger.error(f'*** Exception during ModelLog creation: {e}', exc_info=True)
|
|
raise
|
|
|
|
# with transaction.atomic():
|
|
# for user in users:
|
|
# # print(f' * {user.username}')
|
|
|
|
# # if user.should_synchronize:
|
|
# model_log = ModelLog()
|
|
# model_log.user = user
|
|
# model_log.operation = model_operation
|
|
# model_log.model_name = model_name
|
|
# model_log.model_id = model_id
|
|
# model_log.store_id = store_id
|
|
# model_log.device_id = device_id
|
|
# model_log.save()
|
|
|
|
# print(f'ML users = {len(users)}')
|
|
# existing_log = ModelLog.objects.filter(users__in=users, model_id=model_id, operation=model_operation).first()
|
|
# if existing_log:
|
|
# # print(f'update existing log {existing_log.users} ')
|
|
# existing_log.date = timezone.now()
|
|
# existing_log.device_id = device_id
|
|
# # existing_log.operation = model_operation
|
|
# existing_log.save()
|
|
# existing_log.users.set(users)
|
|
# else:
|
|
# model_log = ModelLog()
|
|
# model_log.operation = model_operation
|
|
# model_log.date = timezone.now()
|
|
# model_log.model_name = model_name
|
|
# model_log.model_id = model_id
|
|
# model_log.store_id = store_id
|
|
# model_log.device_id = device_id
|
|
# model_log.save()
|
|
# model_log.users.set(users)
|
|
|
|
def detect_foreign_key_changes_for_shared_instances(sender, instance):
|
|
if not hasattr(instance, 'pk') or not instance.pk:
|
|
return
|
|
if not isinstance(instance, BaseModel):
|
|
return
|
|
|
|
data_access_list = related_data_access(instance)
|
|
if data_access_list:
|
|
try:
|
|
old_instance = sender.objects.get(pk=instance.pk)
|
|
except sender.DoesNotExist:
|
|
return
|
|
|
|
# Check foreign key fields
|
|
for field in sender._meta.get_fields():
|
|
if isinstance(field, models.ForeignKey) and not field.related_model == User:
|
|
# print(f'field.related_model = {field.related_model}')
|
|
old_value = getattr(old_instance, field.name, None)
|
|
new_value = getattr(instance, field.name, None)
|
|
if old_value != new_value:
|
|
if not hasattr(instance, '_fk_changes'):
|
|
instance._fk_changes = []
|
|
|
|
instance._fk_changes.append({
|
|
'data_access_list': data_access_list,
|
|
'old_value': old_value,
|
|
'new_value': new_value
|
|
})
|
|
|
|
def process_foreign_key_changes(sender, instance, **kwargs):
|
|
|
|
### TODO : we want to avoid creating ModelLog for the user making the change, but how?
|
|
|
|
if hasattr(instance, '_fk_changes'):
|
|
for change in instance._fk_changes:
|
|
for data_access in change['data_access_list']:
|
|
|
|
shared = data_access.shared_with.all()
|
|
owner = {data_access.related_user}
|
|
# print(f'FK changes, 1 owner = {owner}')
|
|
|
|
## exclude last_updated_by from extra notifications
|
|
if instance.last_updated_by:
|
|
shared = shared.exclude(id=instance.last_updated_by.id)
|
|
# owner = owner.discard(instance.last_updated_by)
|
|
|
|
# print(f'FK changes, 2 owner = {owner}')
|
|
if change['old_value']:
|
|
model_name = change['old_value'].__class__.__name__
|
|
if shared:
|
|
# print(f"SHARED_RELATIONSHIP_REMOVED: shared={shared}, model_name={model_name}")
|
|
save_model_log(shared, 'SHARED_RELATIONSHIP_REMOVED',
|
|
model_name, change['old_value'].id,
|
|
change['old_value'].get_store_id())
|
|
if owner:
|
|
# print(f"RELATIONSHIP_REMOVED: owner={owner}, model_name={model_name}")
|
|
save_model_log(owner, 'RELATIONSHIP_REMOVED',
|
|
model_name, change['old_value'].id,
|
|
change['old_value'].get_store_id())
|
|
if change['new_value']:
|
|
model_name = change['new_value'].__class__.__name__
|
|
if shared:
|
|
# print(f"SHARED_RELATIONSHIP_SET: shared={shared}, model_name={model_name}")
|
|
save_model_log(shared, 'SHARED_RELATIONSHIP_SET',
|
|
model_name, change['new_value'].id,
|
|
change['new_value'].get_store_id())
|
|
if owner:
|
|
# print(f"RELATIONSHIP_SET: owner={owner}, model_name={model_name}")
|
|
save_model_log(owner, 'RELATIONSHIP_SET',
|
|
model_name, change['new_value'].id,
|
|
change['new_value'].get_store_id())
|
|
|
|
|
|
### Data Access
|
|
|
|
@receiver(post_delete)
|
|
def delete_data_access_if_necessary(sender, instance, **kwargs):
|
|
if not isinstance(instance, BaseModel):
|
|
return
|
|
if hasattr(instance, 'id'):
|
|
for data_access in DataAccess.objects.filter(model_id=instance.id):
|
|
data_access.create_revoke_access_log()
|
|
|
|
@receiver(m2m_changed, sender=DataAccess.shared_with.through)
|
|
def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs):
|
|
|
|
# print(f'm2m changed = {pk_set}')
|
|
users = User.objects.filter(id__in=pk_set)
|
|
|
|
save_model_log(users, ModelOperation.PUT, DataAccess.__name__, instance.id, None)
|
|
|
|
with transaction.atomic():
|
|
if action == "post_add":
|
|
instance.create_access_log(users, 'SHARED_ACCESS')
|
|
elif action == "post_remove":
|
|
instance.create_access_log(users, 'REVOKED_ACCESS')
|
|
|
|
device_id = device_registry.get_device_id(instance.id)
|
|
# logger.info(f'*** DataAccess m2m_changed > send message to : {pk_set}')
|
|
websocket_sender.send_message(pk_set, device_id)
|
|
|
|
# for user_id in pk_set:
|
|
# websocket_sender.send_user_message(user_id, device_id)
|
|
device_registry.unregister(instance.id)
|
|
|
|
for user in users:
|
|
evaluate_if_user_should_sync(user)
|
|
|
|
@receiver(post_save, sender=DataAccess)
|
|
def data_access_post_save(sender, instance, **kwargs):
|
|
try:
|
|
instance.add_references() # create DataAccess references on hierarchy
|
|
|
|
if instance.related_user:
|
|
evaluate_if_user_should_sync(instance.related_user)
|
|
except Exception as e:
|
|
logger.info(f'*** ERROR3: {e}')
|
|
logger.info(traceback.format_exc())
|
|
raise
|
|
|
|
@receiver(pre_delete, sender=DataAccess)
|
|
def revoke_access_after_delete(sender, instance, **kwargs):
|
|
# logger.info(f'.........PRE_DELETE DATAACCESS = {instance.id}..........')
|
|
try:
|
|
instance.cleanup_references()
|
|
instance.create_revoke_access_log()
|
|
# logger.info(f'*** users to notify data access delete: {instance.shared_with.all()}')
|
|
|
|
related_users_registry.register(instance.id, instance.shared_with.all())
|
|
|
|
instance._user = instance.related_user
|
|
except Exception as e:
|
|
logger.info(f'*** ERROR4: {e}')
|
|
logger.info(traceback.format_exc())
|
|
raise
|
|
|
|
@receiver(post_delete, sender=DataAccess)
|
|
def data_access_post_delete(sender, instance, **kwargs):
|
|
try:
|
|
notify_impacted_users(instance)
|
|
|
|
if not hasattr(instance, '_user') or not instance._user:
|
|
return
|
|
evaluate_if_user_should_sync(instance._user)
|
|
except Exception as e:
|
|
logger.info(f'*** ERROR5: {e}')
|
|
logger.info(traceback.format_exc())
|
|
raise
|
|
# logger.info(f'.........POST_DELETE END DATAACCESS = {instance.id}..........')
|
|
|
|
|
|
def related_users(instance):
|
|
users = set()
|
|
if isinstance(instance, User):
|
|
users.add(instance)
|
|
elif isinstance(instance, BaseModel):
|
|
users.add(instance.related_user)
|
|
data_access_list = DataAccess.objects.filter(id__in=instance.data_access_ids)
|
|
# print(f'instance = {instance.__class__.__name__}, data access count = {len(data_access_list)}')
|
|
for data_access in data_access_list:
|
|
users.add(data_access.related_user)
|
|
users.update(data_access.shared_with.all())
|
|
|
|
# print(f'find users for {instance.__class__.__name__}, count = {len(users)}')
|
|
|
|
return {user for user in users if user is not None}
|
|
|
|
def related_data_access(instance):
|
|
related_instances = instance.related_instances()
|
|
related_ids = [ri.id for ri in related_instances]
|
|
related_ids.append(instance.id)
|
|
return DataAccess.objects.filter(model_id__in=related_ids)
|
|
|
|
def evaluate_if_user_should_sync(user):
|
|
should_synchronize = False
|
|
if user.devices.count() > 1:
|
|
should_synchronize = True
|
|
elif DataAccess.objects.filter(
|
|
models.Q(shared_with=user) |
|
|
models.Q(related_user=user)
|
|
).count() > 0:
|
|
should_synchronize = True
|
|
|
|
with transaction.atomic():
|
|
user.should_synchronize = should_synchronize
|
|
# if we go from True to False we might want to delete ModelLog once the last device has synchronized
|
|
user.save()
|
|
|
|
### Device
|
|
|
|
@receiver(post_save, sender=Device)
|
|
def device_created(sender, instance, **kwargs):
|
|
try:
|
|
if not instance.user:
|
|
return
|
|
evaluate_if_user_should_sync(instance.user)
|
|
except Exception as e:
|
|
logger.info(f'*** ERROR6: {e}')
|
|
logger.info(traceback.format_exc())
|
|
raise
|
|
|
|
@receiver(pre_delete, sender=Device)
|
|
def device_pre_delete(sender, instance, **kwargs):
|
|
instance._user = instance.user if instance.user else None
|
|
|
|
@receiver(post_delete, sender=Device)
|
|
def device_post_delete(sender, instance, **kwargs):
|
|
try:
|
|
if not hasattr(instance, '_user') or not instance._user:
|
|
return
|
|
evaluate_if_user_should_sync(instance._user)
|
|
except Exception as e:
|
|
logger.info(f'*** ERROR7: {e}')
|
|
logger.info(traceback.format_exc())
|
|
raise
|
|
|