Fix issue with relationship changes + websocket sends refactoring

sync_v2
Laurent 5 months ago
parent 625a882d88
commit 3a0074fe91
  1. 2
      sync/models/model_log.py
  2. 37
      sync/signals.py
  3. 6
      sync/utils.py
  4. 12
      sync/views.py
  5. 191
      sync/ws_sender.py

@ -9,6 +9,8 @@ class ModelOperation(models.TextChoices):
REVOKED_ACCESS = 'REVOKED_ACCESS', 'REVOKED_ACCESS' REVOKED_ACCESS = 'REVOKED_ACCESS', 'REVOKED_ACCESS'
SHARED_RELATIONSHIP_SET = 'SHARED_RELATIONSHIP_SET', 'SHARED_RELATIONSHIP_SET' SHARED_RELATIONSHIP_SET = 'SHARED_RELATIONSHIP_SET', 'SHARED_RELATIONSHIP_SET'
SHARED_RELATIONSHIP_REMOVED = 'SHARED_RELATIONSHIP_REMOVED', 'SHARED_RELATIONSHIP_REMOVED' SHARED_RELATIONSHIP_REMOVED = 'SHARED_RELATIONSHIP_REMOVED', 'SHARED_RELATIONSHIP_REMOVED'
RELATIONSHIP_SET = 'RELATIONSHIP_SET', 'RELATIONSHIP_SET'
RELATIONSHIP_REMOVED = 'RELATIONSHIP_REMOVED', 'RELATIONSHIP_REMOVED'
class ModelLog(models.Model): class ModelLog(models.Model):
# id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True) # id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)

@ -77,9 +77,10 @@ def notify_impacted_users(instance):
if users: if users:
user_ids = [user.id for user in users] user_ids = [user.id for user in users]
websocket_sender.send_message(user_ids, device_id)
# print(f'notify device: {device_id}, users = {user_ids}') # print(f'notify device: {device_id}, users = {user_ids}')
for user_id in user_ids: # for user_id in user_ids:
websocket_sender.send_user_message(user_id, device_id) # websocket_sender.send_user_message(user_id, device_id)
device_registry.unregister(instance.id) device_registry.unregister(instance.id)
@ -175,6 +176,7 @@ def detect_foreign_key_changes_for_shared_instances(sender, instance):
return return
data_access_list = related_data_access(instance) data_access_list = related_data_access(instance)
# print(f'FK change > DA count = {len(data_access_list)}')
if data_access_list: if data_access_list:
try: try:
old_instance = sender.objects.get(pk=instance.pk) old_instance = sender.objects.get(pk=instance.pk)
@ -207,35 +209,38 @@ def process_foreign_key_changes(sender, instance, **kwargs):
shared = data_access.shared_with.all() shared = data_access.shared_with.all()
owner = {data_access.related_user} owner = {data_access.related_user}
# print(f'FK changes, 1 owner = {owner}')
## exclude last_updated_by from extra notifications ## exclude last_updated_by from extra notifications
if instance.last_updated_by: if instance.last_updated_by:
shared = shared.exclude(id=instance.last_updated_by.id) shared = shared.exclude(id=instance.last_updated_by.id)
owner = owner.discard(instance.last_updated_by) # owner = owner.discard(instance.last_updated_by)
# print(f'FK changes, 2 owner = {owner}')
if change['old_value']: if change['old_value']:
model_name = change['old_value'].__class__.__name__ model_name = change['old_value'].__class__.__name__
if shared: if shared:
print(f"SHARED_RELATIONSHIP_REMOVED: shared={shared}, model_name={model_name}") # print(f"SHARED_RELATIONSHIP_REMOVED: shared={shared}, model_name={model_name}")
save_model_log(shared, 'SHARED_RELATIONSHIP_REMOVED', save_model_log(shared, 'SHARED_RELATIONSHIP_REMOVED',
model_name, change['old_value'].id, model_name, change['old_value'].id,
change['old_value'].get_store_id()) change['old_value'].get_store_id())
# if owner: if owner:
# print(f"RELATIONSHIP_REMOVED: owner={owner}, model_name={model_name}") # print(f"RELATIONSHIP_REMOVED: owner={owner}, model_name={model_name}")
# save_model_log(owner, 'RELATIONSHIP_REMOVED', save_model_log(owner, 'RELATIONSHIP_REMOVED',
# model_name, change['old_value'].id, model_name, change['old_value'].id,
# change['old_value'].get_store_id()) change['old_value'].get_store_id())
if change['new_value']: if change['new_value']:
model_name = change['new_value'].__class__.__name__ model_name = change['new_value'].__class__.__name__
if shared: if shared:
print(f"SHARED_RELATIONSHIP_SET: shared={shared}, model_name={model_name}") # print(f"SHARED_RELATIONSHIP_SET: shared={shared}, model_name={model_name}")
save_model_log(shared, 'SHARED_RELATIONSHIP_SET', save_model_log(shared, 'SHARED_RELATIONSHIP_SET',
model_name, change['new_value'].id, model_name, change['new_value'].id,
change['new_value'].get_store_id()) change['new_value'].get_store_id())
# if owner: if owner:
# print(f"RELATIONSHIP_SET: owner={owner}, model_name={model_name}") # print(f"RELATIONSHIP_SET: owner={owner}, model_name={model_name}")
# save_model_log(owner, 'RELATIONSHIP_SET', save_model_log(owner, 'RELATIONSHIP_SET',
# model_name, change['old_value'].id, model_name, change['new_value'].id,
# change['old_value'].get_store_id()) change['new_value'].get_store_id())
### Data Access ### Data Access
@ -259,8 +264,10 @@ def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs):
instance.create_access_log(users, 'REVOKED_ACCESS') instance.create_access_log(users, 'REVOKED_ACCESS')
device_id = device_registry.get_device_id(instance.id) device_id = device_registry.get_device_id(instance.id)
for user_id in pk_set: websocket_sender.send_message(pk_set, device_id)
websocket_sender.send_user_message(user_id, device_id)
# for user_id in pk_set:
# websocket_sender.send_user_message(user_id, device_id)
device_registry.unregister(instance.id) device_registry.unregister(instance.id)
for user in users: for user in users:

@ -3,6 +3,9 @@ from django.apps import apps
from .registry import model_registry from .registry import model_registry
from collections import defaultdict from collections import defaultdict
import random
import string
def build_serializer_class(model_name): def build_serializer_class(model_name):
# Remove the 's' character at the end if present # Remove the 's' character at the end if present
@ -40,6 +43,9 @@ def get_serialized_data_by_id(model_name, model_id):
serializer = get_serializer(instance, model_name) serializer = get_serializer(instance, model_name)
return serializer.data return serializer.data
def generate_random_id(length=8):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
class HierarchyOrganizer: class HierarchyOrganizer:
def __init__(self): def __init__(self):
self.levels = [] # List of dictionaries, each representing a level self.levels = [] # List of dictionaries, each representing a level

@ -17,11 +17,12 @@ from collections import defaultdict
from urllib.parse import unquote from urllib.parse import unquote
from .serializers import DataAccessSerializer from .serializers import DataAccessSerializer
from .utils import get_serializer, build_serializer_class, get_data, get_serialized_data_by_id, HierarchyOrganizer from .utils import generate_random_id, get_serializer, build_serializer_class, get_data, get_serialized_data_by_id, HierarchyOrganizer
from .models import ModelLog, BaseModel, SideStoreModel, DataAccess from .models import ModelLog, BaseModel, SideStoreModel, DataAccess
from .registry import model_registry, device_registry from .registry import model_registry, device_registry
from .ws_sender import websocket_sender
# class HierarchyApiView(APIView): # class HierarchyApiView(APIView):
@ -124,6 +125,9 @@ class SynchronizationApi(APIView):
models = set() models = set()
transaction_id = generate_random_id()
websocket_sender.add_user_transaction(self.request.user.id, transaction_id, device_id)
for op in operations: for op in operations:
result = None result = None
message = None message = None
@ -204,6 +208,8 @@ class SynchronizationApi(APIView):
# print(f"sync POST completed for models: {models}") # print(f"sync POST completed for models: {models}")
websocket_sender.remove_user_transaction(self.request.user.id, transaction_id, device_id)
return Response({ return Response({
'results': results 'results': results
}, status=207) # Multi-Status }, status=207) # Multi-Status
@ -260,10 +266,10 @@ class LogProcessingResult:
for log in logs: for log in logs:
self.last_log_date = log.date self.last_log_date = log.date
try: try:
if log.operation in ['POST', 'PUT']: if log.operation in ['POST', 'PUT', 'RELATIONSHIP_SET']:
data = get_serialized_data_by_id(log.model_name, log.model_id) data = get_serialized_data_by_id(log.model_name, log.model_id)
self.updates[log.model_name][log.model_id] = data self.updates[log.model_name][log.model_id] = data
elif log.operation == 'DELETE': elif log.operation in ['DELETE', 'RELATIONSHIP_REMOVED']:
self.deletions[log.model_name].append(log.data_identifier_dict()) self.deletions[log.model_name].append(log.data_identifier_dict())
elif log.operation == 'SHARED_ACCESS': elif log.operation == 'SHARED_ACCESS':
# Remove any existing revocations for this model_id # Remove any existing revocations for this model_id

@ -1,50 +1,185 @@
from channels.layers import get_channel_layer from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync from asgiref.sync import async_to_sync
from threading import Timer from threading import Timer, Lock
class WebSocketSender: class WebSocketSender:
""" """
Manages WebSocket notifications for users with individual buffering timers. Manages WebSocket notifications for users with debouncing and
transaction-aware sending logic.
""" """
def __init__(self): def __init__(self):
self._user_timers = {} # Dictionary to store user-specific timers self._buffer_timeout = 0.1 # Debounce timeout in seconds (100ms)
self._buffer_timeout = 0.1 # Debounce timeout in seconds self._transaction_hold_timeout = 2.0 # Transaction hold timeout in seconds
def send_user_message(self, user_id, device_id): # For 100ms debouncing: {(frozenset_of_user_ids): (Timer_object, device_id_to_send)}
self._debounce_registry = {}
self._debounce_lock = Lock() # Protects _debounce_registry
# For transaction management: {user_id: {transaction_id_1, ...}}
self._user_transactions = {}
self._transaction_lock = Lock() # Protects _user_transactions
# For sends delayed by transactions:
# {(frozenset_of_user_ids): (device_id, Timer_for_2s_override, set_of_initially_transactional_users_in_group)}
self._pending_transactional_sends = {}
self._pending_sends_lock = Lock() # Protects _pending_transactional_sends
def _perform_actual_send(self, user_id, device_id):
""" """
Schedules a notification for a specific user with debouncing. Actually sends the WebSocket message to a single user.
""" """
# print(f'>>> send message: {device_id}') channel_layer = get_channel_layer()
# Cancel existing timer for this user if any group_name = f"sync_{user_id}"
if user_id in self._user_timers and self._user_timers[user_id]:
self._user_timers[user_id].cancel() message_content = device_id if device_id else "std_msg_lol" # Ensure a non-empty message
if channel_layer:
async_to_sync(channel_layer.group_send)(
group_name,
{"type": "sync.update", "message": message_content}
)
# else:
# Consider logging if channel_layer is not available
# print(f"Channel layer not available. Cannot send message to {group_name}.")
# Create new timer for this user def send_message(self, user_ids, device_id):
self._user_timers[user_id] = Timer( """
Schedules a WebSocket message to one or more users with debouncing and transaction handling.
- user_ids: A single user_id or a list/set of user_ids.
- device_id: The message content/identifier to send.
"""
if not isinstance(user_ids, (list, set, tuple)):
user_ids = [user_ids]
user_ids_key = frozenset(user_ids)
if not user_ids_key: # No users to send to
return
with self._debounce_lock:
if user_ids_key in self._debounce_registry:
old_timer, _ = self._debounce_registry[user_ids_key]
old_timer.cancel()
new_timer = Timer(
self._buffer_timeout, self._buffer_timeout,
self._send_message, self._handle_debounced_action,
args=[user_id, device_id] args=[user_ids_key, device_id]
) )
self._user_timers[user_id].start() self._debounce_registry[user_ids_key] = (new_timer, device_id) # Store new timer and latest device_id
new_timer.start()
def _send_message(self, user_id, device_id): def _handle_debounced_action(self, user_ids_key, device_id):
""" """
Sends the WebSocket message for a specific user. Called after the 100ms debounce. Checks transactions and proceeds.
""" """
channel_layer = get_channel_layer() with self._debounce_lock:
group_name = f"sync_{user_id}" # Remove from registry; this timer has fired.
# Relies on Timer.cancel() preventing execution of cancelled timers.
if user_ids_key in self._debounce_registry:
del self._debounce_registry[user_ids_key]
# print(f">>> send to group {group_name}, device_id={device_id}") transactional_users_in_group = set()
# print(f'channel_layer = {channel_layer}') with self._transaction_lock:
for user_id in user_ids_key:
if self._user_transactions.get(user_id): # Check if user has any active transactions
transactional_users_in_group.add(user_id)
device_id = device_id if device_id else "std_msg_lol" # a not empty message is required! if not transactional_users_in_group:
# No transactions for any user in this group, send immediately.
for user_id in user_ids_key:
self._perform_actual_send(user_id, device_id)
else:
# Transactions active for one or more users. Place in transactional hold.
with self._pending_sends_lock:
if user_ids_key in self._pending_transactional_sends:
# If a new send_message (debounced) for this group also hits transactions,
# it supersedes the previous one, resetting the 2s timeout.
_, old_transaction_timer, _ = self._pending_transactional_sends[user_ids_key]
old_transaction_timer.cancel()
async_to_sync(channel_layer.group_send)( new_transaction_timer = Timer(
group_name, self._transaction_hold_timeout,
{"type": "sync.update", "message": device_id} self._force_send_by_transaction_timeout,
args=[user_ids_key] # device_id is retrieved from the registry
) )
# Cleanup timer reference self._pending_transactional_sends[user_ids_key] = (
self._user_timers[user_id] = None device_id,
new_transaction_timer,
transactional_users_in_group # Store for potential debugging/logging
)
new_transaction_timer.start()
def _force_send_by_transaction_timeout(self, user_ids_key):
"""
Called by the 2s timer if transactions didn't clear in time.
Sends the message regardless of current transaction status.
"""
with self._pending_sends_lock:
if user_ids_key in self._pending_transactional_sends:
device_id, _, _ = self._pending_transactional_sends.pop(user_ids_key)
# Timer has fired, no need to cancel.
for user_id in user_ids_key:
self._perform_actual_send(user_id, device_id)
def add_user_transaction(self, user_id, transaction_id, device_id=None):
"""
Declares that a user has started a transaction.
'device_id' is for transaction context, not message content.
"""
with self._transaction_lock:
if user_id not in self._user_transactions:
self._user_transactions[user_id] = set()
self._user_transactions[user_id].add(transaction_id)
def remove_user_transaction(self, user_id, transaction_id, device_id=None):
"""
Declares that a user has ended a transaction.
Checks if pending messages can now be sent.
"""
user_became_transaction_free = False
with self._transaction_lock:
if user_id in self._user_transactions and transaction_id in self._user_transactions[user_id]:
self._user_transactions[user_id].remove(transaction_id)
if not self._user_transactions[user_id]: # Set is empty
del self._user_transactions[user_id]
user_became_transaction_free = True
if user_became_transaction_free:
self._process_sends_for_potentially_cleared_groups(cleared_user_id=user_id)
def _process_sends_for_potentially_cleared_groups(self, cleared_user_id):
"""
If a user becomes transaction-free, check if any groups they were part of
(and were on hold) can now be sent.
"""
messages_to_send_immediately = []
with self._transaction_lock: # Acquire transaction lock first
with self._pending_sends_lock: # Then pending sends lock
pending_groups_keys = list(self._pending_transactional_sends.keys())
for user_ids_key in pending_groups_keys:
if cleared_user_id not in user_ids_key:
continue # This group didn't involve the user who just cleared
# Check if ALL users in this specific pending group are now transaction-free
all_users_in_this_group_are_clear = True
for uid_in_group_key in user_ids_key:
if uid_in_group_key in self._user_transactions: # Still has transactions
all_users_in_this_group_are_clear = False
break
if all_users_in_this_group_are_clear:
device_id, transaction_timer, _ = self._pending_transactional_sends.pop(user_ids_key)
transaction_timer.cancel() # Cancel the 2s override timer
messages_to_send_immediately.append((user_ids_key, device_id))
# Perform actual sends outside of the locks
for user_ids_key, device_id in messages_to_send_immediately:
self.send_message(user_ids_key, device_id)
# for user_id in user_ids_key:
# self.send_message(user_ids_key, device_id)
# self._perform_actual_send(user_id, device_id)
# Create a singleton instance # Create a singleton instance (as in the original file)
websocket_sender = WebSocketSender() websocket_sender = WebSocketSender()

Loading…
Cancel
Save