from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed from django.db import models, transaction from django.dispatch import receiver from .models import DataAccess, ModelLog, ModelOperation, BaseModel, SideStoreModel from authentication.models import Device from django.contrib.auth import get_user_model from .ws_sender import websocket_sender from .registry import device_registry, related_users_registry, model_registry import logging import traceback logger = logging.getLogger(__name__) User = get_user_model() ### Sync @receiver([pre_save, pre_delete]) def presave_handler(sender, instance, **kwargs): try: # some other classes are excluded in settings_app.py: SYNC_APPS if not isinstance(instance, (BaseModel, User)): return signal = kwargs.get('signal') # avoid crash in manage.py createsuperuser + delete user in the admin if isinstance(instance, User) and (instance._state.db is None or signal == pre_delete): return users = related_users(instance) related_users_registry.register(instance.id, users) if signal == pre_save: detect_foreign_key_changes_for_shared_instances(sender, instance) except Exception as e: logger.info(f'*** presave_handler ERROR: {e}') raise @receiver([post_save, post_delete]) def synchronization_notifications(sender, instance, created=False, **kwargs): """ Signal handler that sends notifications through WebSocket channels when model instances are saved or deleted. The function creates a WebSocket group name for each affected user and sends a sync update message to all clients connected to that group. """ # some classes are excluded in settings_app.py: SYNC_APPS if not isinstance(instance, BaseModel) and not isinstance(instance, User): return model_name = instance.__class__.__name__ if model_registry.get_model(model_name) is None: return try: process_foreign_key_changes(sender, instance, **kwargs) signal = kwargs.get('signal') save_model_log_if_possible(instance, signal, created) notify_impacted_users(instance) # print(f'!!!!! related_users_registry.unregister for {instance.__class__.__name__} / {signal}') related_users_registry.unregister(instance.id) except Exception as e: logger.info(f'*** ERROR2: {e}') logger.info(traceback.format_exc()) raise def notify_impacted_users(instance): device_id = device_registry.get_device_id(instance.id) users = related_users_registry.get_users(instance.id) logger.info(f'>>> notify_impacted_users: {users} for {instance.id}') if users: user_ids = [user.id for user in users] websocket_sender.send_message(user_ids, device_id) device_registry.unregister(instance.id) def save_model_log_if_possible(instance, signal, created): users = related_users_registry.get_users(instance.id) # logger.info(f'*** save_model_log_if_possible >>> users from registry = {users}, instance = {instance}') if not users: logger.warning(f'!!! Registry returned empty users for instance {instance.id} ({instance.__class__.__name__})') # Try to recalculate users as fallback users = related_users(instance) logger.info(f'!!! Recalculated users for fallback: {users}') if users: if signal == post_save or signal == pre_save: if created: operation = ModelOperation.POST else: operation = ModelOperation.PUT else: operation = ModelOperation.DELETE store_id = None if isinstance(instance, SideStoreModel): store_id = instance.store_id # if operation == ModelOperation.DELETE: # delete now unnecessary logs # ModelLog.objects.filter(model_id=instance.id).delete() # user_ids = [user.id for user in users] # # print(f'users to notify: {user_ids}') # instance._users_to_notify = user_ids # save this for the post_save signal model_name = instance.__class__.__name__ save_model_log(users, operation, model_name, instance.id, store_id) else: logger.info(f'!!! Model Log could not be created because no linked user could be found: {instance.__class__.__name__} {instance}, {signal}') def save_model_log(users, model_operation, model_name, model_id, store_id): device_id = device_registry.get_device_id(model_id) # logger.info(f'*** creating ModelLogs for: {model_operation} {model_name} : {users}') try: with transaction.atomic(): created_logs = [] for user in users: # logger.info(f'Creating ModelLog for user {user.id} - user exists: {User.objects.filter(id=user.id).exists()}') model_log = ModelLog( user=user, operation=model_operation, model_name=model_name, model_id=model_id, store_id=store_id, device_id=device_id ) model_log.save() # logger.info(f'ModelLog saved with ID: {model_log.id}') created_logs.append(model_log.id) # Immediate verification within transaction # immediate_count = ModelLog.objects.filter(id__in=created_logs).count() # logger.info(f'*** Within transaction: Created {len(created_logs)}, found {immediate_count}') # Verification after transaction commits # persisted_count = ModelLog.objects.filter(id__in=created_logs).count() # logger.info(f'*** After transaction: Created {len(created_logs)}, persisted {persisted_count}') except Exception as e: logger.error(f'*** Exception during ModelLog creation: {e}', exc_info=True) raise # with transaction.atomic(): # for user in users: # # print(f' * {user.username}') # # if user.should_synchronize: # model_log = ModelLog() # model_log.user = user # model_log.operation = model_operation # model_log.model_name = model_name # model_log.model_id = model_id # model_log.store_id = store_id # model_log.device_id = device_id # model_log.save() # print(f'ML users = {len(users)}') # existing_log = ModelLog.objects.filter(users__in=users, model_id=model_id, operation=model_operation).first() # if existing_log: # # print(f'update existing log {existing_log.users} ') # existing_log.date = timezone.now() # existing_log.device_id = device_id # # existing_log.operation = model_operation # existing_log.save() # existing_log.users.set(users) # else: # model_log = ModelLog() # model_log.operation = model_operation # model_log.date = timezone.now() # model_log.model_name = model_name # model_log.model_id = model_id # model_log.store_id = store_id # model_log.device_id = device_id # model_log.save() # model_log.users.set(users) def detect_foreign_key_changes_for_shared_instances(sender, instance): if not hasattr(instance, 'pk') or not instance.pk: return if not isinstance(instance, BaseModel): return data_access_list = related_data_access(instance) if data_access_list: try: old_instance = sender.objects.get(pk=instance.pk) except sender.DoesNotExist: return # Check foreign key fields for field in sender._meta.get_fields(): if isinstance(field, models.ForeignKey) and not field.related_model == User: # print(f'field.related_model = {field.related_model}') old_value = getattr(old_instance, field.name, None) new_value = getattr(instance, field.name, None) if old_value != new_value: if not hasattr(instance, '_fk_changes'): instance._fk_changes = [] instance._fk_changes.append({ 'data_access_list': data_access_list, 'old_value': old_value, 'new_value': new_value }) def process_foreign_key_changes(sender, instance, **kwargs): ### TODO : we want to avoid creating ModelLog for the user making the change, but how? if hasattr(instance, '_fk_changes'): for change in instance._fk_changes: for data_access in change['data_access_list']: shared = data_access.shared_with.all() owner = {data_access.related_user} # print(f'FK changes, 1 owner = {owner}') ## exclude last_updated_by from extra notifications if instance.last_updated_by: shared = shared.exclude(id=instance.last_updated_by.id) # owner = owner.discard(instance.last_updated_by) # print(f'FK changes, 2 owner = {owner}') if change['old_value']: model_name = change['old_value'].__class__.__name__ if shared: # print(f"SHARED_RELATIONSHIP_REMOVED: shared={shared}, model_name={model_name}") save_model_log(shared, 'SHARED_RELATIONSHIP_REMOVED', model_name, change['old_value'].id, change['old_value'].get_store_id()) if owner: # print(f"RELATIONSHIP_REMOVED: owner={owner}, model_name={model_name}") save_model_log(owner, 'RELATIONSHIP_REMOVED', model_name, change['old_value'].id, change['old_value'].get_store_id()) if change['new_value']: model_name = change['new_value'].__class__.__name__ if shared: # print(f"SHARED_RELATIONSHIP_SET: shared={shared}, model_name={model_name}") save_model_log(shared, 'SHARED_RELATIONSHIP_SET', model_name, change['new_value'].id, change['new_value'].get_store_id()) if owner: # print(f"RELATIONSHIP_SET: owner={owner}, model_name={model_name}") save_model_log(owner, 'RELATIONSHIP_SET', model_name, change['new_value'].id, change['new_value'].get_store_id()) ### Data Access @receiver(post_delete) def delete_data_access_if_necessary(sender, instance, **kwargs): if not isinstance(instance, BaseModel): return if hasattr(instance, 'id'): for data_access in DataAccess.objects.filter(model_id=instance.id): data_access.create_revoke_access_log() @receiver(m2m_changed, sender=DataAccess.shared_with.through) def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs): # print(f'm2m changed = {pk_set}') users = User.objects.filter(id__in=pk_set) save_model_log(users, ModelOperation.PUT, DataAccess.__name__, instance.id, None) with transaction.atomic(): if action == "post_add": instance.create_access_log(users, 'SHARED_ACCESS') elif action == "post_remove": instance.create_access_log(users, 'REVOKED_ACCESS') device_id = device_registry.get_device_id(instance.id) # logger.info(f'*** DataAccess m2m_changed > send message to : {pk_set}') websocket_sender.send_message(pk_set, device_id) # for user_id in pk_set: # websocket_sender.send_user_message(user_id, device_id) device_registry.unregister(instance.id) for user in users: evaluate_if_user_should_sync(user) @receiver(post_save, sender=DataAccess) def data_access_post_save(sender, instance, **kwargs): try: instance.add_references() # create DataAccess references on hierarchy if instance.related_user: evaluate_if_user_should_sync(instance.related_user) except Exception as e: logger.info(f'*** ERROR3: {e}') logger.info(traceback.format_exc()) raise @receiver(pre_delete, sender=DataAccess) def revoke_access_after_delete(sender, instance, **kwargs): # logger.info(f'.........PRE_DELETE DATAACCESS = {instance.id}..........') try: instance.cleanup_references() instance.create_revoke_access_log() # logger.info(f'*** users to notify data access delete: {instance.shared_with.all()}') related_users_registry.register(instance.id, instance.shared_with.all()) instance._user = instance.related_user except Exception as e: logger.info(f'*** ERROR4: {e}') logger.info(traceback.format_exc()) raise @receiver(post_delete, sender=DataAccess) def data_access_post_delete(sender, instance, **kwargs): try: notify_impacted_users(instance) if not hasattr(instance, '_user') or not instance._user: return evaluate_if_user_should_sync(instance._user) except Exception as e: logger.info(f'*** ERROR5: {e}') logger.info(traceback.format_exc()) raise # logger.info(f'.........POST_DELETE END DATAACCESS = {instance.id}..........') def related_users(instance): users = set() if isinstance(instance, User): users.add(instance) elif isinstance(instance, BaseModel): users.add(instance.related_user) data_access_list = DataAccess.objects.filter(id__in=instance.data_access_ids) # print(f'instance = {instance.__class__.__name__}, data access count = {len(data_access_list)}') for data_access in data_access_list: users.add(data_access.related_user) users.update(data_access.shared_with.all()) # print(f'find users for {instance.__class__.__name__}, count = {len(users)}') return {user for user in users if user is not None} def related_data_access(instance): related_instances = instance.related_instances() related_ids = [ri.id for ri in related_instances] related_ids.append(instance.id) return DataAccess.objects.filter(model_id__in=related_ids) def evaluate_if_user_should_sync(user): should_synchronize = False if user.devices.count() > 1: should_synchronize = True elif DataAccess.objects.filter( models.Q(shared_with=user) | models.Q(related_user=user) ).count() > 0: should_synchronize = True with transaction.atomic(): user.should_synchronize = should_synchronize # if we go from True to False we might want to delete ModelLog once the last device has synchronized user.save() ### Device @receiver(post_save, sender=Device) def device_created(sender, instance, **kwargs): try: if not instance.user: return evaluate_if_user_should_sync(instance.user) except Exception as e: logger.info(f'*** ERROR6: {e}') logger.info(traceback.format_exc()) raise @receiver(pre_delete, sender=Device) def device_pre_delete(sender, instance, **kwargs): instance._user = instance.user if instance.user else None @receiver(post_delete, sender=Device) def device_post_delete(sender, instance, **kwargs): try: if not hasattr(instance, '_user') or not instance._user: return evaluate_if_user_should_sync(instance._user) except Exception as e: logger.info(f'*** ERROR7: {e}') logger.info(traceback.format_exc()) raise