websocket notification fix + refactoring

sync
Laurent 11 months ago
parent 06e4fba108
commit 5008f5588c
  1. 16
      sync/models/base.py
  2. 61
      sync/signals.py
  3. 2
      sync/views.py
  4. 46
      sync/ws_sender.py
  5. 3
      tournaments/models/custom_user.py
  6. 4
      tournaments/models/date_interval.py
  7. 3
      tournaments/models/group_stage.py
  8. 4
      tournaments/models/match.py
  9. 4
      tournaments/models/player_registration.py
  10. 4
      tournaments/models/round.py
  11. 4
      tournaments/models/team_registration.py
  12. 4
      tournaments/models/team_score.py
  13. 4
      tournaments/models/tournament.py

@ -11,10 +11,6 @@ class BaseModel(models.Model):
class Meta:
abstract = True
def get_parent_reference(self):
"""Return a tuple: model_name, model_id"""
return None, None
def get_children_by_model(self):
"""
Returns a dictionary where:
@ -124,18 +120,6 @@ class BaseModel(models.Model):
return instances
# def related_instances(self):
# instances = []
# children_by_model = self.get_children_by_model()
# all_children = [item for sublist in children_by_model.values() for item in sublist]
# instances.extend(all_children)
# parents_by_model = self.get_parents_by_model()
# instances.extend(parents_by_model.values())
# return instances
class SideStoreModel(BaseModel):
store_id = models.CharField(max_length=100)

@ -5,13 +5,7 @@ from .models import DataAccess, ModelLog, ModelOperation, BaseModel, SideStoreMo
from django.contrib.auth import get_user_model
from django.utils import timezone
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from threading import Timer
# from functools import partial
# Synchronization
from .ws_sender import websocket_sender
User = get_user_model()
@ -19,7 +13,7 @@ User = get_user_model()
def synchronization_prepare(sender, instance, created=False, **kwargs):
# some classes are excluded in settings_app.py: SYNC_APPS
if not isinstance(instance, BaseModel):
if not isinstance(instance, BaseModel) and not isinstance(instance, User):
return
save_model_log_if_possible(instance, kwargs.get('signal'), created)
@ -57,10 +51,15 @@ def notify_impacted_users(instance, signal):
print(f'notify: {user_ids}')
for user_id in user_ids:
send_user_message(user_id)
websocket_sender.send_user_message(user_id)
# send_user_message(user_id)
def save_model_log_if_possible(instance, signal, created):
if isinstance(instance, User):
user = instance
else:
user = instance.last_updated_by
if user:
if signal == post_save or signal == pre_save:
if created:
@ -79,6 +78,7 @@ def save_model_log_if_possible(instance, signal, created):
ModelLog.objects.filter(model_id=instance.id).delete()
users = {user}
if isinstance(instance, BaseModel):
data_access_list = related_data_access(instance)
for data_access in data_access_list:
users.add(data_access.owner)
@ -102,7 +102,7 @@ def save_model_log(users, model_operation, model_name, model_id, store_id):
if existing_log:
# print(f'update existing log {existing_log.users} ')
existing_log.date = now
existing_log.model_operation = model_operation
# existing_log.operation = model_operation
existing_log.save()
existing_log.users.set(users)
else:
@ -135,32 +135,33 @@ def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs):
instance.create_access_log(users, 'REVOKE_ACCESS')
for user_id in pk_set:
send_user_message(user_id)
websocket_sender.send_user_message(user_id)
# send_user_message(user_id)
def send_user_message(user_id):
# def send_user_message(user_id):
if not hasattr(send_user_message, '_buffer'):
send_user_message._buffer = set()
send_user_message._timer = None
# if not hasattr(send_user_message, '_buffer'):
# send_user_message._buffer = set()
# send_user_message._timer = None
send_user_message._buffer.add(user_id)
# send_user_message._buffer.add(user_id)
if send_user_message._timer:
send_user_message._timer.cancel()
# if send_user_message._timer:
# send_user_message._timer.cancel()
def send_buffered_messages():
channel_layer = get_channel_layer()
for buffered_id in send_user_message._buffer:
group_name = f"sync_{buffered_id}"
print(f">>> send to group {group_name}")
async_to_sync(channel_layer.group_send)(
group_name, {"type": "sync.update", "message": "hello"}
)
send_user_message._buffer.clear()
send_user_message._timer = None
# def send_buffered_messages():
# channel_layer = get_channel_layer()
# for buffered_id in send_user_message._buffer:
# group_name = f"sync_{buffered_id}"
# print(f">>> send to group {group_name}")
# async_to_sync(channel_layer.group_send)(
# group_name, {"type": "sync.update", "message": "hello"}
# )
# send_user_message._buffer.clear()
# send_user_message._timer = None
send_user_message._timer = Timer(0.1, send_buffered_messages)
send_user_message._timer.start()
# send_user_message._timer = Timer(0.1, send_buffered_messages)
# send_user_message._timer.start()
@receiver(pre_delete, sender=DataAccess)
def revoke_access_after_delete(sender, instance, **kwargs):

@ -108,7 +108,7 @@ class SynchronizationApi(HierarchyApiView):
serializer = serializer_class(instance, data=data, context={'request': request})
if serializer.is_valid():
if instance.last_update <= serializer.validated_data.get('last_update'):
print('>>> update')
# print('>>> update')
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
else:

@ -0,0 +1,46 @@
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from threading import Timer
class WebSocketSender:
"""
Manages WebSocket notifications for users with individual buffering timers.
"""
def __init__(self):
self._user_timers = {} # Dictionary to store user-specific timers
self._buffer_timeout = 0.1 # Debounce timeout in seconds
def send_user_message(self, user_id):
"""
Schedules a notification for a specific user with debouncing.
"""
# Cancel existing timer for this user if any
if user_id in self._user_timers and self._user_timers[user_id]:
self._user_timers[user_id].cancel()
# Create new timer for this user
self._user_timers[user_id] = Timer(
self._buffer_timeout,
self._send_message,
args=[user_id]
)
self._user_timers[user_id].start()
def _send_message(self, user_id):
"""
Sends the WebSocket message for a specific user.
"""
channel_layer = get_channel_layer()
group_name = f"sync_{user_id}"
print(f">>> send to group {group_name}")
async_to_sync(channel_layer.group_send)(
group_name,
{"type": "sync.update", "message": "hello"}
)
# Cleanup timer reference
self._user_timers[user_id] = None
# Create a singleton instance
websocket_sender = WebSocketSender()

@ -48,9 +48,6 @@ class CustomUser(AbstractUser):
'summons_use_full_custom_message', 'match_formats_default_duration', 'bracket_match_format_preference',
'group_stage_match_format_preference', 'loser_bracket_match_format_preference', 'device_id']
def get_parent_reference(self):
"""Override in child models to provide parent reference"""
return None, None
def __str__(self):
return self.username

@ -8,7 +8,3 @@ class DateInterval(BaseModel):
court_index = models.IntegerField()
start_date = models.DateTimeField()
end_date = models.DateTimeField()
# Data Access
def get_parent_reference(self):
return 'Event', self.event.id

@ -15,9 +15,6 @@ class GroupStage(SideStoreModel):
start_date = models.DateTimeField(null=True, blank=True)
name = models.CharField(max_length=200, null=True, blank=True)
# Data Access
def get_parent_reference(self):
return 'Event', self.tournament.event.id
def __str__(self):
return self.display_name()

@ -26,10 +26,6 @@ class Match(SideStoreModel):
court_index = models.IntegerField(null=True, blank=True)
confirmed = models.BooleanField(default=False)
# Data Access
def get_parent_reference(self):
return 'Event', self.tournament().event.id
def __str__(self):
names = " / ".join(self.player_names())
return f"{self.stage_name()} #{self.index}: {names}"

@ -34,10 +34,6 @@ class PlayerRegistration(SideStoreModel):
source = models.IntegerField(choices=PlayerDataSource.choices, null=True, blank=True)
has_arrived = models.BooleanField(default=False)
# Data Access
def get_parent_reference(self):
return 'Event', self.team_registration.tournament.event.id
def __str__(self):
return self.name()

@ -10,10 +10,6 @@ class Round(SideStoreModel):
format = models.IntegerField(default=FederalMatchCategory.NINE_GAMES, choices=FederalMatchCategory.choices, null=True, blank=True)
start_date = models.DateTimeField(null=True, blank=True)
# Data Access
def get_parent_reference(self):
return 'Event', self.tournament.event.id
def __str__(self):
if self.parent:
return f"LB: {self.name()}"

@ -30,10 +30,6 @@ class TeamRegistration(SideStoreModel):
final_ranking = models.IntegerField(null=True, blank=True)
points_earned = models.IntegerField(null=True, blank=True)
# Data Access
def get_parent_reference(self):
return 'Event', self.tournament.event.id
def __str__(self):
if self.name:
return self.name

@ -13,10 +13,6 @@ class TeamScore(SideStoreModel):
def __str__(self):
return f"{self.match.stage_name()} #{self.match.index}: {self.player_names()}"
# Data Access
def get_parent_reference(self):
return 'Event', self.tournament().event.id
def tournament(self):
if self.team_registration:
return self.team_registration.tournament

@ -60,10 +60,6 @@ class Tournament(BaseModel):
hide_points_earned = models.BooleanField(default=False)
publish_rankings = models.BooleanField(default=False)
# Data Access
def get_parent_reference(self):
return 'Event', self.event.id
def get_child_models(self):
return {
'Round': 'round_set',

Loading…
Cancel
Save