diff --git a/sync/registry.py b/sync/registry.py index d719080..91a86c1 100644 --- a/sync/registry.py +++ b/sync/registry.py @@ -2,6 +2,7 @@ from django.conf import settings from django.apps import apps from .models import BaseModel from django.contrib.auth import get_user_model +import threading User = get_user_model() @@ -35,5 +36,36 @@ class SyncRegistry: self.load_sync_apps() return self._registry.get(model_name) -# Create singleton instance +# Global instance sync_registry = SyncRegistry() + +class DeviceRegistry: + """Thread-safe registry to track device IDs associated with model instances.""" + + def __init__(self): + self._registry = {} + self._lock = threading.RLock() + + def count(self): + """Return the number of items in the registry.""" + with self._lock: + return len(self._registry) + + def register(self, instance_id, device_id): + """Register a device_id for a model instance ID.""" + with self._lock: + self._registry[str(instance_id)] = device_id + + def get_device_id(self, instance_id): + """Get the device_id for a model instance ID.""" + with self._lock: + return self._registry.get(str(instance_id)) + + def unregister(self, instance_id): + """Remove an instance from the registry.""" + with self._lock: + if instance_id in self._registry: + del self._registry[instance_id] + +# Global instance +device_registry = DeviceRegistry() diff --git a/sync/signals.py b/sync/signals.py index ef05eb7..8cc2c4d 100644 --- a/sync/signals.py +++ b/sync/signals.py @@ -7,6 +7,7 @@ from django.contrib.auth import get_user_model from django.utils import timezone from .ws_sender import websocket_sender +from .registry import device_registry User = get_user_model() @@ -43,11 +44,7 @@ def synchronization_prepare(sender, instance, **kwargs): return if signal == pre_save: - device_id = None - if hasattr(instance, '_device_id'): - device_id = instance._device_id - - detect_foreign_key_changes(sender, instance, device_id) + detect_foreign_key_changes(sender, instance) @receiver([post_save, post_delete]) def synchronization_notifications(sender, instance, created=False, **kwargs): @@ -61,20 +58,12 @@ def synchronization_notifications(sender, instance, created=False, **kwargs): if not isinstance(instance, BaseModel) and not isinstance(instance, User): return - device_id = None - if hasattr(instance, '_device_id'): - device_id = instance._device_id - - process_foreign_key_changes(sender, instance, device_id, **kwargs) + process_foreign_key_changes(sender, instance, **kwargs) signal = kwargs.get('signal') - save_model_log_if_possible(instance, signal, created, device_id) - + save_model_log_if_possible(instance, signal, created) notify_impacted_users(instance) - # print(f'*** instance._state.db: {instance._state.db}') - # transaction.on_commit(lambda: notify_impacted_users(instance)) - def notify_impacted_users(instance): user_ids = set() # add impacted users @@ -91,16 +80,15 @@ def notify_impacted_users(instance): else: print('no users to notify') - device_id = None - if hasattr(instance, '_device_id'): - device_id = instance._device_id + device_id = device_registry.get_device_id(instance.id) - # print(f'notify: {user_ids}') + # print(f'notify: {device_id}') for user_id in user_ids: websocket_sender.send_user_message(user_id, device_id) - # send_user_message(user_id) -def save_model_log_if_possible(instance, signal, created, device_id): + device_registry.unregister(instance.id) + +def save_model_log_if_possible(instance, signal, created): users = related_users(instance) # print(f'users = {len(users)}, instance = {instance}') @@ -125,12 +113,14 @@ def save_model_log_if_possible(instance, signal, created, device_id): # print(f'users to notify: {user_ids}') instance._users_to_notify = user_ids # save this for the post_save signal - save_model_log(users, operation, model_name, instance.id, store_id, device_id) + save_model_log(users, operation, model_name, instance.id, store_id) else: print(f'>>> Model Log could not be created because no linked user could be found: {instance.__class__.__name__} {instance}, {signal}') -def save_model_log(users, model_operation, model_name, model_id, store_id, device_id): +def save_model_log(users, model_operation, model_name, model_id, store_id): + + device_id = device_registry.get_device_id(model_id) with transaction.atomic(): for user in users: @@ -164,7 +154,7 @@ def save_model_log(users, model_operation, model_name, model_id, store_id, devic # model_log.save() # model_log.users.set(users) -def detect_foreign_key_changes(sender, instance, device_id): +def detect_foreign_key_changes(sender, instance): if not hasattr(instance, 'pk') or not instance.pk: return if not isinstance(instance, BaseModel): @@ -190,7 +180,8 @@ def detect_foreign_key_changes(sender, instance, device_id): 'new_value': new_value }) -def process_foreign_key_changes(sender, instance, device_id, **kwargs): +def process_foreign_key_changes(sender, instance, **kwargs): + if hasattr(instance, '_fk_changes'): for change in instance._fk_changes: for data_access in change['data_access_list']: @@ -198,12 +189,12 @@ def process_foreign_key_changes(sender, instance, device_id, **kwargs): model_name = change['old_value'].__class__.__name__ save_model_log(data_access.concerned_users(), 'REVOKE_ACCESS', model_name, change['old_value'].id, - change['old_value'].get_store_id(), device_id) + change['old_value'].get_store_id()) if change['new_value']: model_name = change['new_value'].__class__.__name__ save_model_log(data_access.concerned_users(), 'GRANT_ACCESS', model_name, change['new_value'].id, - change['new_value'].get_store_id(), device_id) + change['new_value'].get_store_id()) @receiver(post_delete) def delete_data_access_if_necessary(sender, instance, **kwargs): @@ -222,12 +213,11 @@ def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs): elif action == "post_remove": instance.create_access_log(users, 'REVOKE_ACCESS') - device_id = None - if hasattr(instance, '_device_id'): - device_id = instance._device_id - + device_id = device_registry.get_device_id(instance.id) for user_id in pk_set: websocket_sender.send_user_message(user_id, device_id) + device_registry.unregister(instance.id) + for user in users: evaluate_if_user_should_sync(user) diff --git a/sync/views.py b/sync/views.py index 399cbf2..cde9abc 100644 --- a/sync/views.py +++ b/sync/views.py @@ -21,7 +21,7 @@ from .utils import get_serializer, build_serializer_class, get_data, get_seriali from .models import ModelLog, BaseModel, SideStoreModel, DataAccess -from .registry import sync_registry +from .registry import sync_registry, device_registry class HierarchyApiView(APIView): @@ -104,6 +104,10 @@ class SynchronizationApi(HierarchyApiView): model_operation = op.get('operation') model_name = op.get('model_name') data = op.get('data') + data_id = data.get('id') + device_registry.register(data_id, device_id) + print(f'*** 1count = {device_registry.count()}') + try: print(f'{model_operation} : {model_name}, id = {data['id']}') @@ -119,7 +123,6 @@ class SynchronizationApi(HierarchyApiView): serializer = serializer_class(data=data, context={'request': request}) if serializer.is_valid(): instance = serializer.save() - instance._device_id = device_id result = serializer.data response_status = status.HTTP_201_CREATED else: @@ -127,9 +130,7 @@ class SynchronizationApi(HierarchyApiView): message = json.dumps(serializer.errors) response_status = status.HTTP_400_BAD_REQUEST elif model_operation == 'PUT': - data_id = data.get('id') instance = get_data(model_name, data_id) - instance._device_id = device_id serializer = serializer_class(instance, data=data, context={'request': request}) if serializer.is_valid(): if instance.last_update <= serializer.validated_data.get('last_update'): @@ -143,11 +144,8 @@ class SynchronizationApi(HierarchyApiView): print(f'Data invalid ! {serializer.errors}') response_status = status.HTTP_400_BAD_REQUEST elif model_operation == 'DELETE': - data_id = data.get('id') try: instance = get_data(model_name, data_id) - instance._device_id = device_id - try: instance.delete() response_status = status.HTTP_204_NO_CONTENT