from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from threading import Timer, Lock import logging logger = logging.getLogger(__name__) class WebSocketSender: """ Manages WebSocket notifications for users with debouncing and transaction-aware sending logic. """ def __init__(self): self._buffer_timeout = 0.1 # Debounce timeout in seconds (100ms) self._transaction_hold_timeout = 2.0 # Transaction hold timeout in seconds # For 100ms debouncing: {(frozenset_of_user_ids): (Timer_object, device_id_to_send)} self._debounce_registry = {} self._debounce_lock = Lock() # Protects _debounce_registry # For transaction management: {user_id: {transaction_id_1, ...}} self._user_transactions = {} self._transaction_lock = Lock() # Protects _user_transactions # For sends delayed by transactions: # {(frozenset_of_user_ids): (device_id, Timer_for_2s_override, set_of_initially_transactional_users_in_group)} self._pending_transactional_sends = {} self._pending_sends_lock = Lock() # Protects _pending_transactional_sends def _perform_actual_send(self, user_id, device_id): """ Actually sends the WebSocket message to a single user. """ channel_layer = get_channel_layer() group_name = f"sync_{user_id}" message_content = device_id if device_id else "std_msg_lol" # Ensure a non-empty message if channel_layer: async_to_sync(channel_layer.group_send)( group_name, {"type": "sync.update", "message": message_content} ) # else: # Consider logging if channel_layer is not available # print(f"Channel layer not available. Cannot send message to {group_name}.") def send_message(self, user_ids, device_id): """ Schedules a WebSocket message to one or more users with debouncing and transaction handling. - user_ids: A single user_id or a list/set of user_ids. - device_id: The message content/identifier to send. """ user_ids = [str(user_id) for user_id in user_ids] if not isinstance(user_ids, (list, set, tuple)): user_ids = [user_ids] user_ids_key = frozenset(user_ids) if not user_ids_key: # No users to send to logger.info(f'WARNING: no user ids : {user_ids}') return with self._debounce_lock: timer_device_id = device_id if user_ids_key in self._debounce_registry: old_timer, old_device_id = self._debounce_registry[user_ids_key] old_timer.cancel() if old_device_id != device_id: # we want to notify all devices if there all multiple ones timer_device_id = None new_timer = Timer( self._buffer_timeout, self._handle_debounced_action, args=[user_ids_key, timer_device_id] ) self._debounce_registry[user_ids_key] = (new_timer, device_id) new_timer.start() def _handle_debounced_action(self, user_ids_key, device_id): """ Called after the 100ms debounce. Checks transactions and proceeds. """ with self._debounce_lock: # Remove from registry; this timer has fired. # Relies on Timer.cancel() preventing execution of cancelled timers. if user_ids_key in self._debounce_registry: del self._debounce_registry[user_ids_key] transactional_users_in_group = set() with self._transaction_lock: for user_id in user_ids_key: if self._user_transactions.get(user_id): # Check if user has any active transactions transactional_users_in_group.add(user_id) if not transactional_users_in_group: # No transactions for any user in this group, send immediately. for user_id in user_ids_key: self._perform_actual_send(user_id, device_id) else: # Transactions active for one or more users. Place in transactional hold. with self._pending_sends_lock: if user_ids_key in self._pending_transactional_sends: # If a new send_message (debounced) for this group also hits transactions, # it supersedes the previous one, resetting the 2s timeout. _, old_transaction_timer, _ = self._pending_transactional_sends[user_ids_key] old_transaction_timer.cancel() new_transaction_timer = Timer( self._transaction_hold_timeout, self._force_send_by_transaction_timeout, args=[user_ids_key] # device_id is retrieved from the registry ) self._pending_transactional_sends[user_ids_key] = ( device_id, new_transaction_timer, transactional_users_in_group # Store for potential debugging/logging ) new_transaction_timer.start() def _force_send_by_transaction_timeout(self, user_ids_key): """ Called by the 2s timer if transactions didn't clear in time. Sends the message regardless of current transaction status. """ with self._pending_sends_lock: if user_ids_key in self._pending_transactional_sends: device_id, _, _ = self._pending_transactional_sends.pop(user_ids_key) # Timer has fired, no need to cancel. for user_id in user_ids_key: self._perform_actual_send(user_id, device_id) def add_user_transaction(self, user_id, transaction_id, device_id=None): """ Declares that a user has started a transaction. 'device_id' is for transaction context, not message content. """ with self._transaction_lock: if user_id not in self._user_transactions: self._user_transactions[user_id] = set() self._user_transactions[user_id].add(transaction_id) def remove_user_transaction(self, user_id, transaction_id, device_id=None): """ Declares that a user has ended a transaction. Checks if pending messages can now be sent. """ user_became_transaction_free = False with self._transaction_lock: if user_id in self._user_transactions and transaction_id in self._user_transactions[user_id]: self._user_transactions[user_id].remove(transaction_id) if not self._user_transactions[user_id]: # Set is empty del self._user_transactions[user_id] user_became_transaction_free = True if user_became_transaction_free: self._process_sends_for_potentially_cleared_groups(cleared_user_id=user_id) def _process_sends_for_potentially_cleared_groups(self, cleared_user_id): """ If a user becomes transaction-free, check if any groups they were part of (and were on hold) can now be sent. """ messages_to_send_immediately = [] with self._transaction_lock: # Acquire transaction lock first with self._pending_sends_lock: # Then pending sends lock pending_groups_keys = list(self._pending_transactional_sends.keys()) for user_ids_key in pending_groups_keys: if cleared_user_id not in user_ids_key: continue # This group didn't involve the user who just cleared # Check if ALL users in this specific pending group are now transaction-free all_users_in_this_group_are_clear = True for uid_in_group_key in user_ids_key: if uid_in_group_key in self._user_transactions: # Still has transactions all_users_in_this_group_are_clear = False break if all_users_in_this_group_are_clear: device_id, transaction_timer, _ = self._pending_transactional_sends.pop(user_ids_key) transaction_timer.cancel() # Cancel the 2s override timer messages_to_send_immediately.append((user_ids_key, device_id)) # Perform actual sends outside of the locks for user_ids_key, device_id in messages_to_send_immediately: self.send_message(user_ids_key, device_id) # for user_id in user_ids_key: # self.send_message(user_ids_key, device_id) # self._perform_actual_send(user_id, device_id) # Create a singleton instance (as in the original file) websocket_sender = WebSocketSender()