from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed from django.dispatch import receiver from .models import DataAccess, ModelLog, ModelOperation, BaseModel, SideStoreModel from django.contrib.auth import get_user_model from django.utils import timezone from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from threading import Timer from functools import partial # Synchronization User = get_user_model() @receiver([pre_save, pre_delete]) def synchronization_prepare(sender, instance, created=False, **kwargs): # some classes are excluded in settings_app.py: SYNC_APPS if not isinstance(instance, BaseModel): return save_model_log_if_possible(instance, kwargs.get('signal'), created) @receiver([post_save, post_delete]) def synchronization_notifications(sender, instance, created=False, **kwargs): """ Signal handler that sends notifications through WebSocket channels when model instances are saved or deleted. The function creates a WebSocket group name for each affected user and sends a sync update message to all clients connected to that group. """ # some classes are excluded in settings_app.py: SYNC_APPS if not isinstance(instance, BaseModel): return # print(f'*** signals {sender}') notify_impacted_users(instance, kwargs.get('signal')) def notify_impacted_users(instance, signal): user_ids = set() # add impacted users if isinstance(instance, User): user_ids.add(instance.id) elif isinstance(instance, BaseModel): owner = instance.last_updated_by if owner: user_ids.add(owner.id) if isinstance(instance, BaseModel): if instance._users_to_notify is not None: user_ids.update(instance._users_to_notify) else: print('no users to notify') print(f'notify: {user_ids}') for user_id in user_ids: send_user_message(user_id) def save_model_log_if_possible(instance, signal, created): user = instance.last_updated_by if user: if signal == post_save or signal == pre_save: if created: operation = ModelOperation.POST else: operation = ModelOperation.PUT else: operation = ModelOperation.DELETE model_name = instance.__class__.__name__ store_id = None if isinstance(instance, SideStoreModel): store_id = instance.store_id if operation == ModelOperation.DELETE: # delete now unnecessary logs ModelLog.objects.filter(model_id=instance.id).delete() users = {user} data_access_list = related_data_access(instance) for data_access in data_access_list: users.add(data_access.owner) users.update(data_access.shared_with.all()) if isinstance(instance, DataAccess): users.add(instance.owner) users.update(instance.shared_with.all()) user_ids = [user.id for user in users] print(f'users to notify: {user_ids}') instance._users_to_notify = user_ids # save this for the post_save signal save_model_log(users, operation, model_name, instance.id, store_id) else: print('>>> Model Log could not be created because instance.last_updated_by is None') def save_model_log(users, model_operation, model_name, model_id, store_id): now = timezone.now() existing_log = ModelLog.objects.filter(users__in=users, model_id=model_id, operation=model_operation).first() if existing_log: # print(f'update existing log {existing_log.users} ') existing_log.date = now existing_log.model_operation = model_operation existing_log.save() existing_log.users.set(users) else: model_log = ModelLog() model_log.operation = model_operation model_log.date = now model_log.model_name = model_name model_log.model_id = model_id model_log.store_id = store_id model_log.save() model_log.users.set(users) def related_data_access(instance): related_instances = instance.related_instances() related_ids = [ri.id for ri in instance.related_instances()] related_ids.append(instance.id) return DataAccess.objects.filter(model_id__in=related_ids) def delete_data_access_if_necessary(model_id): DataAccess.objects.filter(model_id=model_id).delete() @receiver(m2m_changed, sender=DataAccess.shared_with.through) def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs): users = User.objects.filter(id__in=pk_set) if action == "post_add": instance.create_access_log(users, 'GRANT_ACCESS') elif action == "post_remove": instance.create_access_log(users, 'REVOKE_ACCESS') for user_id in pk_set: send_user_message(user_id) def send_user_message(user_id): if not hasattr(send_user_message, '_buffer'): send_user_message._buffer = set() send_user_message._timer = None send_user_message._buffer.add(user_id) if send_user_message._timer: send_user_message._timer.cancel() def send_buffered_messages(): channel_layer = get_channel_layer() for buffered_id in send_user_message._buffer: group_name = f"sync_{buffered_id}" print(f">>> send to group {group_name}") async_to_sync(channel_layer.group_send)( group_name, {"type": "sync.update", "message": "hello"} ) send_user_message._buffer.clear() send_user_message._timer = None send_user_message._timer = Timer(0.1, send_buffered_messages) send_user_message._timer.start() @receiver(pre_delete, sender=DataAccess) def revoke_access_after_delete(sender, instance, **kwargs): instance.create_revoke_access_log()