You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/sync/ws_sender.py

192 lines
8.6 KiB

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from threading import Timer, Lock
import logging
logger = logging.getLogger(__name__)
class WebSocketSender:
"""
Manages WebSocket notifications for users with debouncing and
transaction-aware sending logic.
"""
def __init__(self):
self._buffer_timeout = 0.1 # Debounce timeout in seconds (100ms)
self._transaction_hold_timeout = 2.0 # Transaction hold timeout in seconds
# For 100ms debouncing: {(frozenset_of_user_ids): (Timer_object, device_id_to_send)}
self._debounce_registry = {}
self._debounce_lock = Lock() # Protects _debounce_registry
# For transaction management: {user_id: {transaction_id_1, ...}}
self._user_transactions = {}
self._transaction_lock = Lock() # Protects _user_transactions
# For sends delayed by transactions:
# {(frozenset_of_user_ids): (device_id, Timer_for_2s_override, set_of_initially_transactional_users_in_group)}
self._pending_transactional_sends = {}
self._pending_sends_lock = Lock() # Protects _pending_transactional_sends
def _perform_actual_send(self, user_id, device_id):
"""
Actually sends the WebSocket message to a single user.
"""
channel_layer = get_channel_layer()
group_name = f"sync_{user_id}"
message_content = device_id if device_id else "std_msg_lol" # Ensure a non-empty message
if channel_layer:
async_to_sync(channel_layer.group_send)(
group_name,
{"type": "sync.update", "message": message_content}
)
# else:
# Consider logging if channel_layer is not available
# print(f"Channel layer not available. Cannot send message to {group_name}.")
def send_message(self, user_ids, device_id):
"""
Schedules a WebSocket message to one or more users with debouncing and transaction handling.
- user_ids: A single user_id or a list/set of user_ids.
- device_id: The message content/identifier to send.
"""
user_ids = [str(user_id) for user_id in user_ids]
if not isinstance(user_ids, (list, set, tuple)):
user_ids = [user_ids]
user_ids_key = frozenset(user_ids)
if not user_ids_key: # No users to send to
logger.info(f'WARNING: no user ids : {user_ids}')
return
with self._debounce_lock:
if user_ids_key in self._debounce_registry:
old_timer, _ = self._debounce_registry[user_ids_key]
old_timer.cancel()
new_timer = Timer(
self._buffer_timeout,
self._handle_debounced_action,
args=[user_ids_key, device_id]
)
self._debounce_registry[user_ids_key] = (new_timer, device_id) # Store new timer and latest device_id
new_timer.start()
def _handle_debounced_action(self, user_ids_key, device_id):
"""
Called after the 100ms debounce. Checks transactions and proceeds.
"""
with self._debounce_lock:
# Remove from registry; this timer has fired.
# Relies on Timer.cancel() preventing execution of cancelled timers.
if user_ids_key in self._debounce_registry:
del self._debounce_registry[user_ids_key]
transactional_users_in_group = set()
with self._transaction_lock:
for user_id in user_ids_key:
if self._user_transactions.get(user_id): # Check if user has any active transactions
transactional_users_in_group.add(user_id)
if not transactional_users_in_group:
# No transactions for any user in this group, send immediately.
for user_id in user_ids_key:
self._perform_actual_send(user_id, device_id)
else:
# Transactions active for one or more users. Place in transactional hold.
with self._pending_sends_lock:
if user_ids_key in self._pending_transactional_sends:
# If a new send_message (debounced) for this group also hits transactions,
# it supersedes the previous one, resetting the 2s timeout.
_, old_transaction_timer, _ = self._pending_transactional_sends[user_ids_key]
old_transaction_timer.cancel()
new_transaction_timer = Timer(
self._transaction_hold_timeout,
self._force_send_by_transaction_timeout,
args=[user_ids_key] # device_id is retrieved from the registry
)
self._pending_transactional_sends[user_ids_key] = (
device_id,
new_transaction_timer,
transactional_users_in_group # Store for potential debugging/logging
)
new_transaction_timer.start()
def _force_send_by_transaction_timeout(self, user_ids_key):
"""
Called by the 2s timer if transactions didn't clear in time.
Sends the message regardless of current transaction status.
"""
with self._pending_sends_lock:
if user_ids_key in self._pending_transactional_sends:
device_id, _, _ = self._pending_transactional_sends.pop(user_ids_key)
# Timer has fired, no need to cancel.
for user_id in user_ids_key:
self._perform_actual_send(user_id, device_id)
def add_user_transaction(self, user_id, transaction_id, device_id=None):
"""
Declares that a user has started a transaction.
'device_id' is for transaction context, not message content.
"""
with self._transaction_lock:
if user_id not in self._user_transactions:
self._user_transactions[user_id] = set()
self._user_transactions[user_id].add(transaction_id)
def remove_user_transaction(self, user_id, transaction_id, device_id=None):
"""
Declares that a user has ended a transaction.
Checks if pending messages can now be sent.
"""
user_became_transaction_free = False
with self._transaction_lock:
if user_id in self._user_transactions and transaction_id in self._user_transactions[user_id]:
self._user_transactions[user_id].remove(transaction_id)
if not self._user_transactions[user_id]: # Set is empty
del self._user_transactions[user_id]
user_became_transaction_free = True
if user_became_transaction_free:
self._process_sends_for_potentially_cleared_groups(cleared_user_id=user_id)
def _process_sends_for_potentially_cleared_groups(self, cleared_user_id):
"""
If a user becomes transaction-free, check if any groups they were part of
(and were on hold) can now be sent.
"""
messages_to_send_immediately = []
with self._transaction_lock: # Acquire transaction lock first
with self._pending_sends_lock: # Then pending sends lock
pending_groups_keys = list(self._pending_transactional_sends.keys())
for user_ids_key in pending_groups_keys:
if cleared_user_id not in user_ids_key:
continue # This group didn't involve the user who just cleared
# Check if ALL users in this specific pending group are now transaction-free
all_users_in_this_group_are_clear = True
for uid_in_group_key in user_ids_key:
if uid_in_group_key in self._user_transactions: # Still has transactions
all_users_in_this_group_are_clear = False
break
if all_users_in_this_group_are_clear:
device_id, transaction_timer, _ = self._pending_transactional_sends.pop(user_ids_key)
transaction_timer.cancel() # Cancel the 2s override timer
messages_to_send_immediately.append((user_ids_key, device_id))
# Perform actual sends outside of the locks
for user_ids_key, device_id in messages_to_send_immediately:
self.send_message(user_ids_key, device_id)
# for user_id in user_ids_key:
# self.send_message(user_ids_key, device_id)
# self._perform_actual_send(user_id, device_id)
# Create a singleton instance (as in the original file)
websocket_sender = WebSocketSender()