Improvements and fixes

sync
Laurent 11 months ago
parent c19b96e4a3
commit 5caaa7c68b
  1. 2
      padelclub_backend/settings_app.py
  2. 74
      sync/consumers.py
  3. 18
      sync/migrations/0003_modellog_device_id.py
  4. 5
      sync/models/model_log.py
  5. 2
      sync/routing.py
  6. 112
      sync/signals.py
  7. 2
      sync/utils.py
  8. 19
      sync/views.py
  9. 16
      sync/ws_sender.py
  10. 29
      tournaments/migrations/0107_remove_devicetoken_creation_date_and_more.py
  11. 2
      tournaments/models/device_token.py

@ -46,5 +46,5 @@ QR_CODE_CACHE_ALIAS = 'qr-code'
SYNC_APPS = { SYNC_APPS = {
'sync': {}, 'sync': {},
'tournaments': { 'exclude': ['Log', 'FailedApiCall'] } 'tournaments': { 'exclude': ['Log', 'FailedApiCall', 'DeviceToken'] }
} }

@ -5,12 +5,14 @@ from channels.generic.websocket import WebsocketConsumer
class UserConsumer(WebsocketConsumer): class UserConsumer(WebsocketConsumer):
def connect(self): def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["user_id"] self.user_id = self.scope["url_route"]["kwargs"]["user_id"]
self.room_group_name = f"sync_{self.room_name}" self.group_name = f"sync_{self.user_id}"
print(f'connect, group_name = {self.group_name}')
# Join room group # Join room group
async_to_sync(self.channel_layer.group_add)( async_to_sync(self.channel_layer.group_add)(
self.room_group_name, self.channel_name self.group_name, self.channel_name
) )
self.accept() self.accept()
@ -18,62 +20,62 @@ class UserConsumer(WebsocketConsumer):
def disconnect(self, close_code): def disconnect(self, close_code):
# Leave room group # Leave room group
async_to_sync(self.channel_layer.group_discard)( async_to_sync(self.channel_layer.group_discard)(
self.room_group_name, self.channel_name self.group_name, self.channel_name
) )
# Receive message from WebSocket # Receive message from WebSocket
def receive(self, data): def receive(self, data):
# text_data_json = json.loads(text_data) # text_data_json = json.loads(text_data)
# message = text_data_json["message"] # message = text_data_json["message"]
print(f'received {data}') print(f'received: {data}')
# Send message to room group # Send message to room group
# chat.message calls the chat_message method
async_to_sync(self.channel_layer.group_send)( async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "sync.update", "message": "hello"} # sync.update calls the method below self.group_name, {"type": "sync.update", "message": data} # sync.update calls the method below
) )
# Receive message from room group # Receive message from room group
def sync_update(self, event): def sync_update(self, event):
message = event["message"] message = event["message"]
print(f'event = {event}')
# Send message to WebSocket # Send message to WebSocket
self.send(text_data=message) self.send(text_data=message)
class ChatConsumer(WebsocketConsumer): # class ChatConsumer(WebsocketConsumer):
def connect(self): # def connect(self):
self.room_name = 'main' # self.room_name = 'main'
self.room_group_name = f"chat_{self.room_name}" # self.room_group_name = f"chat_{self.room_name}"
# Join room group # # Join room group
async_to_sync(self.channel_layer.group_add)( # async_to_sync(self.channel_layer.group_add)(
self.room_group_name, self.channel_name # self.room_group_name, self.channel_name
) # )
self.accept() # self.accept()
def disconnect(self, close_code): # def disconnect(self, close_code):
# Leave room group # # Leave room group
async_to_sync(self.channel_layer.group_discard)( # async_to_sync(self.channel_layer.group_discard)(
self.room_group_name, self.channel_name # self.room_group_name, self.channel_name
) # )
# Receive message from WebSocket # # Receive message from WebSocket
def receive(self, text_data): # def receive(self, text_data):
# text_data_json = json.loads(text_data) # # text_data_json = json.loads(text_data)
# message = text_data_json["message"] # # message = text_data_json["message"]
print(f'received {text_data}') # print(f'received {text_data}')
# Send message to room group # # Send message to room group
# chat.message calls the chat_message method # # chat.message calls the chat_message method
async_to_sync(self.channel_layer.group_send)( # async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "chat.message", "message": text_data} # self.room_group_name, {"type": "chat.message", "message": text_data}
) # )
# Receive message from room group # # Receive message from room group
def chat_message(self, event): # def chat_message(self, event):
message = event["message"] # message = event["message"]
# Send message to WebSocket # # Send message to WebSocket
self.send(text_data=message) # self.send(text_data=message)

@ -0,0 +1,18 @@
# Generated by Django 5.1 on 2024-12-11 13:46
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('sync', '0002_remove_dataaccess_owner_dataaccess_related_user_and_more'),
]
operations = [
migrations.AddField(
model_name='modellog',
name='device_id',
field=models.CharField(blank=True, max_length=200, null=True),
),
]

@ -19,11 +19,10 @@ class ModelLog(models.Model):
date = models.DateTimeField() date = models.DateTimeField()
model_name = models.CharField(max_length=50) model_name = models.CharField(max_length=50)
store_id = models.CharField(max_length=200, blank=True, null=True) store_id = models.CharField(max_length=200, blank=True, null=True)
# parent_model_id = models.UUIDField(blank=True, null=True) device_id = models.CharField(max_length=200, blank=True, null=True)
# parent_model_name = models.CharField(max_length=50, blank=True, null=True)
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
# Round microseconds to milliseconds (3 decimal places) # Round microseconds to milliseconds (3 decimals to match Swift precision)
if self.date: if self.date:
microseconds = round(self.date.microsecond, -3) # Round to nearest thousand microseconds = round(self.date.microsecond, -3) # Round to nearest thousand
self.date = self.date.replace(microsecond=microseconds) self.date = self.date.replace(microsecond=microseconds)

@ -5,6 +5,6 @@ from . import consumers
websocket_urlpatterns = [ websocket_urlpatterns = [
re_path(r"ws/user/(?P<user_id>[\w-]+)/$", consumers.UserConsumer.as_asgi()), re_path(r"ws/user/(?P<user_id>[\w-]+)/$", consumers.UserConsumer.as_asgi()),
re_path(r"ws/chat/$", consumers.ChatConsumer.as_asgi()), # re_path(r"ws/chat/$", consumers.ChatConsumer.as_asgi()),
# re_path(r"ws/chat/(?P<room_name>\w+)/$", consumers.ChatConsumer.as_asgi()), # re_path(r"ws/chat/(?P<room_name>\w+)/$", consumers.ChatConsumer.as_asgi()),
] ]

@ -13,46 +13,26 @@ User = get_user_model()
@receiver([pre_save, pre_delete]) @receiver([pre_save, pre_delete])
def synchronization_prepare(sender, instance, created=False, **kwargs): def synchronization_prepare(sender, instance, created=False, **kwargs):
signal = kwargs.get('signal')
# avoid crash in manage.py createsuperuser + delete user in the admin
if isinstance(instance, User) and (instance._state.db is None or signal == pre_delete):
return
# some other classes are excluded in settings_app.py: SYNC_APPS # some other classes are excluded in settings_app.py: SYNC_APPS
if not isinstance(instance, BaseModel) and not isinstance(instance, User): if not isinstance(instance, BaseModel) and not isinstance(instance, User):
return return
save_model_log_if_possible(instance, kwargs.get('signal'), created) device_id = None
if hasattr(instance, '_device_id'):
if kwargs.get('signal') == pre_save: device_id = instance._device_id
detect_foreign_key_changes(sender, instance)
def detect_foreign_key_changes(sender, instance, **kwargs): # print(f'kwargs = {kwargs}')
if not hasattr(instance, 'pk') or not instance.pk: save_model_log_if_possible(instance, signal, created, device_id)
return
if not isinstance(instance, BaseModel):
return
data_access_list = related_data_access(instance) if signal == pre_save:
if data_access_list: # print('yes')
try: detect_foreign_key_changes(sender, instance, device_id)
old_instance = sender.objects.get(pk=instance.pk)
except sender.DoesNotExist:
return
# Check foreign key fields
for field in sender._meta.get_fields():
if isinstance(field, models.ForeignKey) and not field.related_model == User:
print(f'field.related_model = {field.related_model}')
old_value = getattr(old_instance, field.name, None)
new_value = getattr(instance, field.name, None)
if old_value != new_value:
for data_access in data_access_list:
if old_value:
model_name = old_value.__class__.__name__
save_model_log(data_access.shared_with.all(), 'REVOKE_ACCESS', model_name, old_value.id, old_value.get_store_id())
if new_value:
model_name = new_value.__class__.__name__
save_model_log(data_access.shared_with.all(), 'GRANT_ACCESS', model_name, new_value.id, new_value.get_store_id())
# REVOKE access for old_value and GRANT new_value
print(f"Foreign key changed in {sender.__name__}: "
f"{field.name} from {old_value} to {new_value}")
@receiver([post_save, post_delete]) @receiver([post_save, post_delete])
def synchronization_notifications(sender, instance, created=False, **kwargs): def synchronization_notifications(sender, instance, created=False, **kwargs):
@ -66,10 +46,10 @@ def synchronization_notifications(sender, instance, created=False, **kwargs):
if not isinstance(instance, BaseModel) and not isinstance(instance, User): if not isinstance(instance, BaseModel) and not isinstance(instance, User):
return return
# print(f'*** signals {sender}') # print(f'*** instance._state.db: {instance._state.db}')
notify_impacted_users(instance, kwargs.get('signal')) notify_impacted_users(instance)
def notify_impacted_users(instance, signal): def notify_impacted_users(instance):
user_ids = set() user_ids = set()
# add impacted users # add impacted users
if isinstance(instance, User): if isinstance(instance, User):
@ -80,23 +60,28 @@ def notify_impacted_users(instance, signal):
user_ids.add(owner.id) user_ids.add(owner.id)
if isinstance(instance, BaseModel): if isinstance(instance, BaseModel):
if instance._users_to_notify is not None: if hasattr(instance, '_users_to_notify'):
user_ids.update(instance._users_to_notify) user_ids.update(instance._users_to_notify)
else: else:
print('no users to notify') print('no users to notify')
device_id = None
if hasattr(instance, '_device_id'):
device_id = instance._device_id
print(f'notify: {user_ids}') print(f'notify: {user_ids}')
for user_id in user_ids: for user_id in user_ids:
websocket_sender.send_user_message(user_id) websocket_sender.send_user_message(user_id, device_id)
# send_user_message(user_id) # send_user_message(user_id)
def save_model_log_if_possible(instance, signal, created, device_id):
def save_model_log_if_possible(instance, signal, created):
if isinstance(instance, User): if isinstance(instance, User):
users = {instance} users = {instance}
else: else:
users = related_users(instance) users = related_users(instance)
# print(f'users = {users}')
if users: if users:
if signal == post_save or signal == pre_save: if signal == post_save or signal == pre_save:
if created: if created:
@ -118,17 +103,18 @@ def save_model_log_if_possible(instance, signal, created):
print(f'users to notify: {user_ids}') print(f'users to notify: {user_ids}')
instance._users_to_notify = user_ids # save this for the post_save signal instance._users_to_notify = user_ids # save this for the post_save signal
save_model_log(users, operation, model_name, instance.id, store_id) save_model_log(users, operation, model_name, instance.id, store_id, device_id)
else: else:
print('>>> Model Log could not be created because instance.last_updated_by is None') print('>>> Model Log could not be created because instance.last_updated_by is None')
def save_model_log(users, model_operation, model_name, model_id, store_id): def save_model_log(users, model_operation, model_name, model_id, store_id, device_id):
now = timezone.now() now = timezone.now()
existing_log = ModelLog.objects.filter(users__in=users, model_id=model_id, operation=model_operation).first() existing_log = ModelLog.objects.filter(users__in=users, model_id=model_id, operation=model_operation).first()
if existing_log: if existing_log:
# print(f'update existing log {existing_log.users} ') # print(f'update existing log {existing_log.users} ')
existing_log.date = now existing_log.date = now
existing_log.device_id = device_id
# existing_log.operation = model_operation # existing_log.operation = model_operation
existing_log.save() existing_log.save()
existing_log.users.set(users) existing_log.users.set(users)
@ -139,9 +125,42 @@ def save_model_log(users, model_operation, model_name, model_id, store_id):
model_log.model_name = model_name model_log.model_name = model_name
model_log.model_id = model_id model_log.model_id = model_id
model_log.store_id = store_id model_log.store_id = store_id
model_log.device_id = device_id
model_log.save() model_log.save()
model_log.users.set(users) model_log.users.set(users)
def detect_foreign_key_changes(sender, instance, device_id):
if not hasattr(instance, 'pk') or not instance.pk:
return
if not isinstance(instance, BaseModel):
return
data_access_list = related_data_access(instance)
if data_access_list:
try:
old_instance = sender.objects.get(pk=instance.pk)
except sender.DoesNotExist:
return
# Check foreign key fields
for field in sender._meta.get_fields():
if isinstance(field, models.ForeignKey) and not field.related_model == User:
# print(f'field.related_model = {field.related_model}')
old_value = getattr(old_instance, field.name, None)
new_value = getattr(instance, field.name, None)
if old_value != new_value:
for data_access in data_access_list:
if old_value:
model_name = old_value.__class__.__name__
save_model_log(data_access.shared_with.all(), 'REVOKE_ACCESS', model_name, old_value.id, old_value.get_store_id(), device_id)
if new_value:
model_name = new_value.__class__.__name__
save_model_log(data_access.shared_with.all(), 'GRANT_ACCESS', model_name, new_value.id, new_value.get_store_id(), device_id)
# REVOKE access for old_value and GRANT new_value
print(f"Foreign key changed in {sender.__name__}: "
f"{field.name} from {old_value} to {new_value}")
def delete_data_access_if_necessary(model_id): def delete_data_access_if_necessary(model_id):
DataAccess.objects.filter(model_id=model_id).delete() DataAccess.objects.filter(model_id=model_id).delete()
@ -155,9 +174,12 @@ def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs):
elif action == "post_remove": elif action == "post_remove":
instance.create_access_log(users, 'REVOKE_ACCESS') instance.create_access_log(users, 'REVOKE_ACCESS')
device_id = None
if hasattr(instance, '_device_id'):
device_id = instance._device_id
for user_id in pk_set: for user_id in pk_set:
websocket_sender.send_user_message(user_id) websocket_sender.send_user_message(user_id, device_id)
# send_user_message(user_id)
@receiver(pre_delete, sender=DataAccess) @receiver(pre_delete, sender=DataAccess)
def revoke_access_after_delete(sender, instance, **kwargs): def revoke_access_after_delete(sender, instance, **kwargs):
@ -171,10 +193,12 @@ def related_users(instance):
users.add(instance.related_user) users.add(instance.related_user)
users.add(instance.last_updated_by) users.add(instance.last_updated_by)
# look in hierarchy
related_instances = instance.related_instances() related_instances = instance.related_instances()
related_users = [ri.related_user for ri in related_instances if isinstance(ri, BaseModel)] related_users = [ri.related_user for ri in related_instances if isinstance(ri, BaseModel)]
users.update(related_users) users.update(related_users)
# look in related DataAccess
data_access_list = instances_related_data_access(instance, related_instances) data_access_list = instances_related_data_access(instance, related_instances)
for data_access in data_access_list: for data_access in data_access_list:
users.add(data_access.related_user) users.add(data_access.related_user)

@ -34,7 +34,7 @@ def get_data(model_name, model_id):
return model.objects.get(id=model_id) return model.objects.get(id=model_id)
def get_serialized_data(model_name, model_id): def get_serialized_data(model_name, model_id):
print(f'model_name = {model_name}') # print(f'model_name = {model_name}')
model = sync_registry.get_model(model_name) model = sync_registry.get_model(model_name)
instance = model.objects.get(id=model_id) instance = model.objects.get(id=model_id)
serializer_class = build_serializer_class(model_name) serializer_class = build_serializer_class(model_name)

@ -85,9 +85,9 @@ class SynchronizationApi(HierarchyApiView):
model_operation = request.data.get('operation') model_operation = request.data.get('operation')
model_name = request.data.get('model_name') model_name = request.data.get('model_name')
data = request.data.get('data') data = request.data.get('data')
# store_id = request.data.get('store_id') device_id = request.data.get('device_id')
print(f"DataApi post > {model_operation} {model_name}") print(f"DataApi post > {model_operation} {model_name}, device: {device_id}")
serializer_class = build_serializer_class(model_name) serializer_class = build_serializer_class(model_name)
data['last_updated_by'] = request.user.id # always refresh the user performing the operation data['last_updated_by'] = request.user.id # always refresh the user performing the operation
@ -98,13 +98,15 @@ class SynchronizationApi(HierarchyApiView):
if model_operation == 'POST': if model_operation == 'POST':
serializer = serializer_class(data=data, context={'request': request}) serializer = serializer_class(data=data, context={'request': request})
if serializer.is_valid(): if serializer.is_valid():
serializer.save() instance = serializer.save()
instance._device_id = device_id
return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.data, status=status.HTTP_201_CREATED)
else: else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
elif model_operation == 'PUT': elif model_operation == 'PUT':
data_id = data.get('id') data_id = data.get('id')
instance = get_data(model_name, data_id) instance = get_data(model_name, data_id)
instance._device_id = device_id
serializer = serializer_class(instance, data=data, context={'request': request}) serializer = serializer_class(instance, data=data, context={'request': request})
if serializer.is_valid(): if serializer.is_valid():
if instance.last_update <= serializer.validated_data.get('last_update'): if instance.last_update <= serializer.validated_data.get('last_update'):
@ -120,6 +122,7 @@ class SynchronizationApi(HierarchyApiView):
data_id = data.get('id') data_id = data.get('id')
try: try:
instance = get_data(model_name, data_id) instance = get_data(model_name, data_id)
instance._device_id = device_id
instance.delete() instance.delete()
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
except model.DoesNotExist: # POST except model.DoesNotExist: # POST
@ -131,6 +134,7 @@ class SynchronizationApi(HierarchyApiView):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
last_update_str = request.query_params.get('last_update') last_update_str = request.query_params.get('last_update')
device_id = request.query_params.get('device_id')
decoded_last_update = unquote(last_update_str) # Decodes %2B into + decoded_last_update = unquote(last_update_str) # Decodes %2B into +
# print(f'last_update_str = {last_update_str}') # print(f'last_update_str = {last_update_str}')
@ -144,7 +148,7 @@ class SynchronizationApi(HierarchyApiView):
print(f'/data GET: {last_update}') print(f'/data GET: {last_update}')
logs = self.query_model_logs(last_update, request.user) logs = self.query_model_logs(last_update, request.user, device_id)
print(f'>>> log count = {len(logs)}') print(f'>>> log count = {len(logs)}')
updates = defaultdict(dict) updates = defaultdict(dict)
@ -220,9 +224,10 @@ class SynchronizationApi(HierarchyApiView):
print(f'response_data = {response_data}') print(f'response_data = {response_data}')
return Response(response_data, status=status.HTTP_200_OK) return Response(response_data, status=status.HTTP_200_OK)
def query_model_logs(self, last_update, user): def query_model_logs(self, last_update, user, device_id):
# print(f'last_update = {last_update}') log_query = Q(date__gt=last_update, users=user)
log_query = Q(date__gt=last_update) & Q(users=user) if device_id:
log_query &= Q(device_id=device_id)
return ModelLog.objects.filter(log_query).order_by('date') return ModelLog.objects.filter(log_query).order_by('date')
class UserDataAccessApi(HierarchyApiView): class UserDataAccessApi(HierarchyApiView):

@ -10,10 +10,11 @@ class WebSocketSender:
self._user_timers = {} # Dictionary to store user-specific timers self._user_timers = {} # Dictionary to store user-specific timers
self._buffer_timeout = 0.1 # Debounce timeout in seconds self._buffer_timeout = 0.1 # Debounce timeout in seconds
def send_user_message(self, user_id): def send_user_message(self, user_id, device_id):
""" """
Schedules a notification for a specific user with debouncing. Schedules a notification for a specific user with debouncing.
""" """
print(f'>>> send message: {device_id}')
# Cancel existing timer for this user if any # Cancel existing timer for this user if any
if user_id in self._user_timers and self._user_timers[user_id]: if user_id in self._user_timers and self._user_timers[user_id]:
self._user_timers[user_id].cancel() self._user_timers[user_id].cancel()
@ -22,23 +23,26 @@ class WebSocketSender:
self._user_timers[user_id] = Timer( self._user_timers[user_id] = Timer(
self._buffer_timeout, self._buffer_timeout,
self._send_message, self._send_message,
args=[user_id] args=[user_id, device_id]
) )
self._user_timers[user_id].start() self._user_timers[user_id].start()
def _send_message(self, user_id): def _send_message(self, user_id, device_id):
""" """
Sends the WebSocket message for a specific user. Sends the WebSocket message for a specific user.
""" """
channel_layer = get_channel_layer() channel_layer = get_channel_layer()
group_name = f"sync_{user_id}" group_name = f"sync_{user_id}"
print(f">>> send to group {group_name}")
print(f">>> send to group {group_name}, device_id={device_id}")
# print(f'channel_layer = {channel_layer}')
device_id = device_id if device_id else "std_msg_lol" # a not empty message is required!
async_to_sync(channel_layer.group_send)( async_to_sync(channel_layer.group_send)(
group_name, group_name,
{"type": "sync.update", "message": "hello"} {"type": "sync.update", "message": device_id}
) )
# Cleanup timer reference # Cleanup timer reference
self._user_timers[user_id] = None self._user_timers[user_id] = None

@ -0,0 +1,29 @@
# Generated by Django 5.1 on 2024-12-11 16:39
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('tournaments', '0106_club_related_user_court_related_user_and_more'),
]
operations = [
migrations.RemoveField(
model_name='devicetoken',
name='creation_date',
),
migrations.RemoveField(
model_name='devicetoken',
name='last_update',
),
migrations.RemoveField(
model_name='devicetoken',
name='last_updated_by',
),
migrations.RemoveField(
model_name='devicetoken',
name='related_user',
),
]

@ -3,7 +3,7 @@ from . import CustomUser
import uuid import uuid
from . import BaseModel from . import BaseModel
class DeviceToken(BaseModel): class DeviceToken(models.Model):
user = models.ForeignKey(CustomUser, on_delete=models.CASCADE, related_name='device_tokens') user = models.ForeignKey(CustomUser, on_delete=models.CASCADE, related_name='device_tokens')
value = models.TextField() value = models.TextField()

Loading…
Cancel
Save