From 3a0074fe91f645dd9a12c6f09b302450b13022f1 Mon Sep 17 00:00:00 2001 From: Laurent Date: Tue, 3 Jun 2025 10:58:19 +0200 Subject: [PATCH] Fix issue with relationship changes + websocket sends refactoring --- sync/models/model_log.py | 2 + sync/signals.py | 41 ++++---- sync/utils.py | 6 ++ sync/views.py | 12 ++- sync/ws_sender.py | 197 +++++++++++++++++++++++++++++++++------ 5 files changed, 207 insertions(+), 51 deletions(-) diff --git a/sync/models/model_log.py b/sync/models/model_log.py index daab4d0..bb1fac9 100644 --- a/sync/models/model_log.py +++ b/sync/models/model_log.py @@ -9,6 +9,8 @@ class ModelOperation(models.TextChoices): REVOKED_ACCESS = 'REVOKED_ACCESS', 'REVOKED_ACCESS' SHARED_RELATIONSHIP_SET = 'SHARED_RELATIONSHIP_SET', 'SHARED_RELATIONSHIP_SET' SHARED_RELATIONSHIP_REMOVED = 'SHARED_RELATIONSHIP_REMOVED', 'SHARED_RELATIONSHIP_REMOVED' + RELATIONSHIP_SET = 'RELATIONSHIP_SET', 'RELATIONSHIP_SET' + RELATIONSHIP_REMOVED = 'RELATIONSHIP_REMOVED', 'RELATIONSHIP_REMOVED' class ModelLog(models.Model): # id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True) diff --git a/sync/signals.py b/sync/signals.py index db9a839..84c7b63 100644 --- a/sync/signals.py +++ b/sync/signals.py @@ -77,9 +77,10 @@ def notify_impacted_users(instance): if users: user_ids = [user.id for user in users] + websocket_sender.send_message(user_ids, device_id) # print(f'notify device: {device_id}, users = {user_ids}') - for user_id in user_ids: - websocket_sender.send_user_message(user_id, device_id) + # for user_id in user_ids: + # websocket_sender.send_user_message(user_id, device_id) device_registry.unregister(instance.id) @@ -175,6 +176,7 @@ def detect_foreign_key_changes_for_shared_instances(sender, instance): return data_access_list = related_data_access(instance) + # print(f'FK change > DA count = {len(data_access_list)}') if data_access_list: try: old_instance = sender.objects.get(pk=instance.pk) @@ -207,35 +209,38 @@ def process_foreign_key_changes(sender, instance, **kwargs): shared = data_access.shared_with.all() owner = {data_access.related_user} + # print(f'FK changes, 1 owner = {owner}') + ## exclude last_updated_by from extra notifications if instance.last_updated_by: shared = shared.exclude(id=instance.last_updated_by.id) - owner = owner.discard(instance.last_updated_by) + # owner = owner.discard(instance.last_updated_by) + # print(f'FK changes, 2 owner = {owner}') if change['old_value']: model_name = change['old_value'].__class__.__name__ if shared: - print(f"SHARED_RELATIONSHIP_REMOVED: shared={shared}, model_name={model_name}") + # print(f"SHARED_RELATIONSHIP_REMOVED: shared={shared}, model_name={model_name}") save_model_log(shared, 'SHARED_RELATIONSHIP_REMOVED', model_name, change['old_value'].id, change['old_value'].get_store_id()) - # if owner: - # print(f"RELATIONSHIP_REMOVED: owner={owner}, model_name={model_name}") - # save_model_log(owner, 'RELATIONSHIP_REMOVED', - # model_name, change['old_value'].id, - # change['old_value'].get_store_id()) + if owner: + # print(f"RELATIONSHIP_REMOVED: owner={owner}, model_name={model_name}") + save_model_log(owner, 'RELATIONSHIP_REMOVED', + model_name, change['old_value'].id, + change['old_value'].get_store_id()) if change['new_value']: model_name = change['new_value'].__class__.__name__ if shared: - print(f"SHARED_RELATIONSHIP_SET: shared={shared}, model_name={model_name}") + # print(f"SHARED_RELATIONSHIP_SET: shared={shared}, model_name={model_name}") save_model_log(shared, 'SHARED_RELATIONSHIP_SET', model_name, change['new_value'].id, change['new_value'].get_store_id()) - # if owner: - # print(f"RELATIONSHIP_SET: owner={owner}, model_name={model_name}") - # save_model_log(owner, 'RELATIONSHIP_SET', - # model_name, change['old_value'].id, - # change['old_value'].get_store_id()) + if owner: + # print(f"RELATIONSHIP_SET: owner={owner}, model_name={model_name}") + save_model_log(owner, 'RELATIONSHIP_SET', + model_name, change['new_value'].id, + change['new_value'].get_store_id()) ### Data Access @@ -259,8 +264,10 @@ def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs): instance.create_access_log(users, 'REVOKED_ACCESS') device_id = device_registry.get_device_id(instance.id) - for user_id in pk_set: - websocket_sender.send_user_message(user_id, device_id) + websocket_sender.send_message(pk_set, device_id) + + # for user_id in pk_set: + # websocket_sender.send_user_message(user_id, device_id) device_registry.unregister(instance.id) for user in users: diff --git a/sync/utils.py b/sync/utils.py index cf3ed11..ee7ff80 100644 --- a/sync/utils.py +++ b/sync/utils.py @@ -3,6 +3,9 @@ from django.apps import apps from .registry import model_registry from collections import defaultdict +import random +import string + def build_serializer_class(model_name): # Remove the 's' character at the end if present @@ -40,6 +43,9 @@ def get_serialized_data_by_id(model_name, model_id): serializer = get_serializer(instance, model_name) return serializer.data +def generate_random_id(length=8): + return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) + class HierarchyOrganizer: def __init__(self): self.levels = [] # List of dictionaries, each representing a level diff --git a/sync/views.py b/sync/views.py index fc23675..3f27041 100644 --- a/sync/views.py +++ b/sync/views.py @@ -17,11 +17,12 @@ from collections import defaultdict from urllib.parse import unquote from .serializers import DataAccessSerializer -from .utils import get_serializer, build_serializer_class, get_data, get_serialized_data_by_id, HierarchyOrganizer +from .utils import generate_random_id, get_serializer, build_serializer_class, get_data, get_serialized_data_by_id, HierarchyOrganizer from .models import ModelLog, BaseModel, SideStoreModel, DataAccess from .registry import model_registry, device_registry +from .ws_sender import websocket_sender # class HierarchyApiView(APIView): @@ -124,6 +125,9 @@ class SynchronizationApi(APIView): models = set() + transaction_id = generate_random_id() + websocket_sender.add_user_transaction(self.request.user.id, transaction_id, device_id) + for op in operations: result = None message = None @@ -204,6 +208,8 @@ class SynchronizationApi(APIView): # print(f"sync POST completed for models: {models}") + websocket_sender.remove_user_transaction(self.request.user.id, transaction_id, device_id) + return Response({ 'results': results }, status=207) # Multi-Status @@ -260,10 +266,10 @@ class LogProcessingResult: for log in logs: self.last_log_date = log.date try: - if log.operation in ['POST', 'PUT']: + if log.operation in ['POST', 'PUT', 'RELATIONSHIP_SET']: data = get_serialized_data_by_id(log.model_name, log.model_id) self.updates[log.model_name][log.model_id] = data - elif log.operation == 'DELETE': + elif log.operation in ['DELETE', 'RELATIONSHIP_REMOVED']: self.deletions[log.model_name].append(log.data_identifier_dict()) elif log.operation == 'SHARED_ACCESS': # Remove any existing revocations for this model_id diff --git a/sync/ws_sender.py b/sync/ws_sender.py index ff2af3a..e1e4f89 100644 --- a/sync/ws_sender.py +++ b/sync/ws_sender.py @@ -1,50 +1,185 @@ from channels.layers import get_channel_layer from asgiref.sync import async_to_sync -from threading import Timer +from threading import Timer, Lock class WebSocketSender: """ - Manages WebSocket notifications for users with individual buffering timers. + Manages WebSocket notifications for users with debouncing and + transaction-aware sending logic. """ def __init__(self): - self._user_timers = {} # Dictionary to store user-specific timers - self._buffer_timeout = 0.1 # Debounce timeout in seconds + self._buffer_timeout = 0.1 # Debounce timeout in seconds (100ms) + self._transaction_hold_timeout = 2.0 # Transaction hold timeout in seconds - def send_user_message(self, user_id, device_id): - """ - Schedules a notification for a specific user with debouncing. - """ - # print(f'>>> send message: {device_id}') - # Cancel existing timer for this user if any - if user_id in self._user_timers and self._user_timers[user_id]: - self._user_timers[user_id].cancel() + # For 100ms debouncing: {(frozenset_of_user_ids): (Timer_object, device_id_to_send)} + self._debounce_registry = {} + self._debounce_lock = Lock() # Protects _debounce_registry - # Create new timer for this user - self._user_timers[user_id] = Timer( - self._buffer_timeout, - self._send_message, - args=[user_id, device_id] - ) - self._user_timers[user_id].start() + # For transaction management: {user_id: {transaction_id_1, ...}} + self._user_transactions = {} + self._transaction_lock = Lock() # Protects _user_transactions - def _send_message(self, user_id, device_id): + # For sends delayed by transactions: + # {(frozenset_of_user_ids): (device_id, Timer_for_2s_override, set_of_initially_transactional_users_in_group)} + self._pending_transactional_sends = {} + self._pending_sends_lock = Lock() # Protects _pending_transactional_sends + + def _perform_actual_send(self, user_id, device_id): """ - Sends the WebSocket message for a specific user. + Actually sends the WebSocket message to a single user. """ channel_layer = get_channel_layer() group_name = f"sync_{user_id}" - # print(f">>> send to group {group_name}, device_id={device_id}") - # print(f'channel_layer = {channel_layer}') + message_content = device_id if device_id else "std_msg_lol" # Ensure a non-empty message + + if channel_layer: + async_to_sync(channel_layer.group_send)( + group_name, + {"type": "sync.update", "message": message_content} + ) + # else: + # Consider logging if channel_layer is not available + # print(f"Channel layer not available. Cannot send message to {group_name}.") + + def send_message(self, user_ids, device_id): + """ + Schedules a WebSocket message to one or more users with debouncing and transaction handling. + - user_ids: A single user_id or a list/set of user_ids. + - device_id: The message content/identifier to send. + """ + if not isinstance(user_ids, (list, set, tuple)): + user_ids = [user_ids] + + user_ids_key = frozenset(user_ids) + if not user_ids_key: # No users to send to + return + + with self._debounce_lock: + if user_ids_key in self._debounce_registry: + old_timer, _ = self._debounce_registry[user_ids_key] + old_timer.cancel() + + new_timer = Timer( + self._buffer_timeout, + self._handle_debounced_action, + args=[user_ids_key, device_id] + ) + self._debounce_registry[user_ids_key] = (new_timer, device_id) # Store new timer and latest device_id + new_timer.start() + + def _handle_debounced_action(self, user_ids_key, device_id): + """ + Called after the 100ms debounce. Checks transactions and proceeds. + """ + with self._debounce_lock: + # Remove from registry; this timer has fired. + # Relies on Timer.cancel() preventing execution of cancelled timers. + if user_ids_key in self._debounce_registry: + del self._debounce_registry[user_ids_key] + + transactional_users_in_group = set() + with self._transaction_lock: + for user_id in user_ids_key: + if self._user_transactions.get(user_id): # Check if user has any active transactions + transactional_users_in_group.add(user_id) + + if not transactional_users_in_group: + # No transactions for any user in this group, send immediately. + for user_id in user_ids_key: + self._perform_actual_send(user_id, device_id) + else: + # Transactions active for one or more users. Place in transactional hold. + with self._pending_sends_lock: + if user_ids_key in self._pending_transactional_sends: + # If a new send_message (debounced) for this group also hits transactions, + # it supersedes the previous one, resetting the 2s timeout. + _, old_transaction_timer, _ = self._pending_transactional_sends[user_ids_key] + old_transaction_timer.cancel() + + new_transaction_timer = Timer( + self._transaction_hold_timeout, + self._force_send_by_transaction_timeout, + args=[user_ids_key] # device_id is retrieved from the registry + ) + self._pending_transactional_sends[user_ids_key] = ( + device_id, + new_transaction_timer, + transactional_users_in_group # Store for potential debugging/logging + ) + new_transaction_timer.start() + + def _force_send_by_transaction_timeout(self, user_ids_key): + """ + Called by the 2s timer if transactions didn't clear in time. + Sends the message regardless of current transaction status. + """ + with self._pending_sends_lock: + if user_ids_key in self._pending_transactional_sends: + device_id, _, _ = self._pending_transactional_sends.pop(user_ids_key) + # Timer has fired, no need to cancel. + for user_id in user_ids_key: + self._perform_actual_send(user_id, device_id) + + def add_user_transaction(self, user_id, transaction_id, device_id=None): + """ + Declares that a user has started a transaction. + 'device_id' is for transaction context, not message content. + """ + with self._transaction_lock: + if user_id not in self._user_transactions: + self._user_transactions[user_id] = set() + self._user_transactions[user_id].add(transaction_id) + + def remove_user_transaction(self, user_id, transaction_id, device_id=None): + """ + Declares that a user has ended a transaction. + Checks if pending messages can now be sent. + """ + user_became_transaction_free = False + with self._transaction_lock: + if user_id in self._user_transactions and transaction_id in self._user_transactions[user_id]: + self._user_transactions[user_id].remove(transaction_id) + if not self._user_transactions[user_id]: # Set is empty + del self._user_transactions[user_id] + user_became_transaction_free = True + + if user_became_transaction_free: + self._process_sends_for_potentially_cleared_groups(cleared_user_id=user_id) + + def _process_sends_for_potentially_cleared_groups(self, cleared_user_id): + """ + If a user becomes transaction-free, check if any groups they were part of + (and were on hold) can now be sent. + """ + messages_to_send_immediately = [] + + with self._transaction_lock: # Acquire transaction lock first + with self._pending_sends_lock: # Then pending sends lock + pending_groups_keys = list(self._pending_transactional_sends.keys()) + + for user_ids_key in pending_groups_keys: + if cleared_user_id not in user_ids_key: + continue # This group didn't involve the user who just cleared + + # Check if ALL users in this specific pending group are now transaction-free + all_users_in_this_group_are_clear = True + for uid_in_group_key in user_ids_key: + if uid_in_group_key in self._user_transactions: # Still has transactions + all_users_in_this_group_are_clear = False + break - device_id = device_id if device_id else "std_msg_lol" # a not empty message is required! + if all_users_in_this_group_are_clear: + device_id, transaction_timer, _ = self._pending_transactional_sends.pop(user_ids_key) + transaction_timer.cancel() # Cancel the 2s override timer + messages_to_send_immediately.append((user_ids_key, device_id)) - async_to_sync(channel_layer.group_send)( - group_name, - {"type": "sync.update", "message": device_id} - ) - # Cleanup timer reference - self._user_timers[user_id] = None + # Perform actual sends outside of the locks + for user_ids_key, device_id in messages_to_send_immediately: + self.send_message(user_ids_key, device_id) + # for user_id in user_ids_key: + # self.send_message(user_ids_key, device_id) + # self._perform_actual_send(user_id, device_id) -# Create a singleton instance +# Create a singleton instance (as in the original file) websocket_sender = WebSocketSender()