|
|
|
|
@ -1,6 +1,6 @@ |
|
|
|
|
import random |
|
|
|
|
import string |
|
|
|
|
from django.db.models.signals import post_save, pre_delete, post_delete, m2m_changed |
|
|
|
|
from django.db.models.signals import pre_save, post_save, pre_delete, post_delete, m2m_changed |
|
|
|
|
from django.db.transaction import DatabaseError |
|
|
|
|
from django.dispatch import receiver |
|
|
|
|
from django.conf import settings |
|
|
|
|
@ -8,30 +8,46 @@ from django.apps import apps |
|
|
|
|
from django.utils import timezone |
|
|
|
|
from django.db.models import Q |
|
|
|
|
|
|
|
|
|
from .models import Club, FailedApiCall, CustomUser, Log, DataAccess, ModelLog, BaseModel |
|
|
|
|
from .models import Club, FailedApiCall, CustomUser, Log, DataAccess, ModelLog, ModelOperation, BaseModel, SideStoreModel |
|
|
|
|
import requests |
|
|
|
|
|
|
|
|
|
from channels.layers import get_channel_layer |
|
|
|
|
from asgiref.sync import async_to_sync |
|
|
|
|
|
|
|
|
|
from threading import Timer |
|
|
|
|
from functools import partial |
|
|
|
|
|
|
|
|
|
# Synchronization |
|
|
|
|
|
|
|
|
|
@receiver([pre_save, pre_delete]) |
|
|
|
|
def synchronization_prepare(sender, instance, created=False, **kwargs): |
|
|
|
|
|
|
|
|
|
if not isinstance(instance, BaseModel): |
|
|
|
|
return |
|
|
|
|
if sender in [FailedApiCall, Log, ModelLog]: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
save_model_log_if_possible(instance, kwargs.get('signal'), created) |
|
|
|
|
# if not isinstance(instance, DataAccess): |
|
|
|
|
# update_data_access(instance) |
|
|
|
|
|
|
|
|
|
@receiver([post_save, post_delete]) |
|
|
|
|
def synchronization_notifications(sender, instance, **kwargs): |
|
|
|
|
def synchronization_notifications(sender, instance, created=False, **kwargs): |
|
|
|
|
""" |
|
|
|
|
Signal handler that sends notifications through WebSocket channels when model instances are saved or deleted. |
|
|
|
|
The function creates a WebSocket group name for each affected user and sends a sync update message |
|
|
|
|
to all clients connected to that group. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
if not isinstance(instance, BaseModel): |
|
|
|
|
return |
|
|
|
|
if sender in [FailedApiCall, Log, ModelLog]: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
if kwargs.get('signal') == post_delete: |
|
|
|
|
delete_data_access_if_necessary(instance.id) |
|
|
|
|
|
|
|
|
|
print(f'*** signals {sender}') |
|
|
|
|
# print(f'*** signals {sender}') |
|
|
|
|
notify_impacted_users(instance, kwargs.get('signal')) |
|
|
|
|
|
|
|
|
|
def notify_impacted_users(instance, signal): |
|
|
|
|
user_ids = set() |
|
|
|
|
# add impacted users |
|
|
|
|
if isinstance(instance, CustomUser): |
|
|
|
|
@ -42,64 +58,126 @@ def synchronization_notifications(sender, instance, **kwargs): |
|
|
|
|
user_ids.add(owner.id) |
|
|
|
|
|
|
|
|
|
if isinstance(instance, BaseModel): |
|
|
|
|
data_access_query = Q(model_id=instance.id) |
|
|
|
|
if kwargs.get('signal') != post_delete: |
|
|
|
|
# when deleting objects, accessing reference generates DoesNotExist exceptions |
|
|
|
|
if instance._users_to_notify is not None: |
|
|
|
|
user_ids.update(instance._users_to_notify) |
|
|
|
|
else: |
|
|
|
|
print('no users to notify') |
|
|
|
|
|
|
|
|
|
print(f'notify: {user_ids}') |
|
|
|
|
for user_id in user_ids: |
|
|
|
|
send_user_message(user_id) |
|
|
|
|
|
|
|
|
|
def save_model_log_if_possible(instance, signal, created): |
|
|
|
|
user = instance.last_updated_by |
|
|
|
|
if user is not None: |
|
|
|
|
if signal == post_save or signal == pre_save: |
|
|
|
|
if created: |
|
|
|
|
operation = ModelOperation.POST |
|
|
|
|
else: |
|
|
|
|
operation = ModelOperation.PUT |
|
|
|
|
else: |
|
|
|
|
operation = ModelOperation.DELETE |
|
|
|
|
|
|
|
|
|
model_name = instance.__class__.__name__ |
|
|
|
|
store_id = None |
|
|
|
|
if isinstance(instance, SideStoreModel): |
|
|
|
|
store_id = instance.store_id |
|
|
|
|
|
|
|
|
|
parent_model, data_access_reference_id = instance.get_parent_reference() |
|
|
|
|
if data_access_reference_id is not None: |
|
|
|
|
data_access_query |= Q(model_id=data_access_reference_id) |
|
|
|
|
if operation == ModelOperation.DELETE: # delete now unnecessary logs |
|
|
|
|
ModelLog.objects.filter(model_id=instance.id).delete() |
|
|
|
|
|
|
|
|
|
# look for users through data access objects |
|
|
|
|
data_access_list = DataAccess.objects.filter(data_access_query) |
|
|
|
|
users = {user} |
|
|
|
|
data_access_list = related_data_access(instance) |
|
|
|
|
for data_access in data_access_list: |
|
|
|
|
user_ids.add(data_access.owner.id) |
|
|
|
|
for shared_user in data_access.shared_with.all(): |
|
|
|
|
user_ids.add(shared_user.id) |
|
|
|
|
users.update(data_access.shared_with.all()) |
|
|
|
|
if isinstance(instance, DataAccess): |
|
|
|
|
users.add(instance.owner) |
|
|
|
|
users.update(instance.shared_with.all()) |
|
|
|
|
|
|
|
|
|
for user_id in user_ids: |
|
|
|
|
send_user_message(user_id) |
|
|
|
|
user_ids = [user.id for user in users] |
|
|
|
|
|
|
|
|
|
print(f'users to notify: {user_ids}') |
|
|
|
|
instance._users_to_notify = user_ids # save this for the post_save signal |
|
|
|
|
save_model_log(users, operation, model_name, instance.id, store_id) |
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
print('>>> Model Log could not be created because instance.last_updated_by is None') |
|
|
|
|
|
|
|
|
|
def save_model_log(users, model_operation, model_name, model_id, store_id): |
|
|
|
|
now = timezone.now() |
|
|
|
|
existing_log = ModelLog.objects.filter(users__in=users, model_id=model_id, operation=model_operation).first() |
|
|
|
|
if existing_log: |
|
|
|
|
existing_log.date = now |
|
|
|
|
existing_log.model_operation = model_operation |
|
|
|
|
existing_log.save() |
|
|
|
|
else: |
|
|
|
|
model_log = ModelLog() |
|
|
|
|
model_log.operation = model_operation |
|
|
|
|
model_log.date = now |
|
|
|
|
model_log.model_name = model_name |
|
|
|
|
model_log.model_id = model_id |
|
|
|
|
model_log.store_id = store_id |
|
|
|
|
model_log.save() |
|
|
|
|
model_log.users.set(users) |
|
|
|
|
|
|
|
|
|
def related_data_access(instance): |
|
|
|
|
related_instances = instance.related_instances() |
|
|
|
|
related_ids = [ri.id for ri in instance.related_instances()] |
|
|
|
|
related_ids.append(instance.id) |
|
|
|
|
return DataAccess.objects.filter(model_id__in=related_ids) |
|
|
|
|
|
|
|
|
|
# def update_data_access(instance): |
|
|
|
|
# data_access_list = related_data_access(instance) |
|
|
|
|
|
|
|
|
|
# for data_access in data_access_list: |
|
|
|
|
# date = timezone.now() if instance.last_update is None else instance.last_update |
|
|
|
|
# data_access.last_hierarchy_update = date |
|
|
|
|
# data_access.save() |
|
|
|
|
|
|
|
|
|
def delete_data_access_if_necessary(model_id): |
|
|
|
|
DataAccess.objects.filter(model_id=model_id).delete() |
|
|
|
|
|
|
|
|
|
@receiver(m2m_changed, sender=DataAccess.shared_with.through) |
|
|
|
|
def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs): |
|
|
|
|
|
|
|
|
|
users = CustomUser.objects.filter(id__in=pk_set) |
|
|
|
|
|
|
|
|
|
if action == "post_add": |
|
|
|
|
instance.create_access_log(users, 'GRANT_ACCESS') |
|
|
|
|
elif action == "post_remove": |
|
|
|
|
instance.create_access_log(users, 'REVOKE_ACCESS') |
|
|
|
|
|
|
|
|
|
for user_id in pk_set: |
|
|
|
|
send_user_message(user_id) |
|
|
|
|
|
|
|
|
|
def send_user_message(user_id): |
|
|
|
|
group_name = f"sync_{user_id}" |
|
|
|
|
print(f">>> send to group {group_name}") |
|
|
|
|
|
|
|
|
|
# Send to all clients in the sync group |
|
|
|
|
if not hasattr(send_user_message, '_buffer'): |
|
|
|
|
send_user_message._buffer = set() |
|
|
|
|
send_user_message._timer = None |
|
|
|
|
|
|
|
|
|
send_user_message._buffer.add(user_id) |
|
|
|
|
|
|
|
|
|
if send_user_message._timer: |
|
|
|
|
send_user_message._timer.cancel() |
|
|
|
|
|
|
|
|
|
def send_buffered_messages(): |
|
|
|
|
channel_layer = get_channel_layer() |
|
|
|
|
for buffered_id in send_user_message._buffer: |
|
|
|
|
group_name = f"sync_{buffered_id}" |
|
|
|
|
print(f">>> send to group {group_name}") |
|
|
|
|
async_to_sync(channel_layer.group_send)( |
|
|
|
|
group_name, {"type": "sync.update", "message": "hello"} |
|
|
|
|
) |
|
|
|
|
send_user_message._buffer.clear() |
|
|
|
|
send_user_message._timer = None |
|
|
|
|
|
|
|
|
|
@receiver(m2m_changed, sender=DataAccess.shared_with.through) |
|
|
|
|
def handle_shared_with_changes(sender, instance, action, pk_set, **kwargs): |
|
|
|
|
if action == "post_add": |
|
|
|
|
for user_id in pk_set: |
|
|
|
|
user = CustomUser.objects.get(id=user_id) |
|
|
|
|
instance.create_access_log(user, 'GRANT_ACCESS') |
|
|
|
|
send_user_message(user_id) |
|
|
|
|
elif action == "post_remove": |
|
|
|
|
for user_id in pk_set: |
|
|
|
|
user = CustomUser.objects.get(id=user_id) |
|
|
|
|
instance.create_access_log(user, 'REVOKE_ACCESS') |
|
|
|
|
send_user_message(user_id) |
|
|
|
|
send_user_message._timer = Timer(0.1, send_buffered_messages) |
|
|
|
|
send_user_message._timer.start() |
|
|
|
|
|
|
|
|
|
@receiver(pre_delete, sender=DataAccess) |
|
|
|
|
def revoke_access_after_delete(sender, instance, **kwargs): |
|
|
|
|
for user in instance.shared_with.all(): |
|
|
|
|
instance.create_access_log(user, 'REVOKE_ACCESS') |
|
|
|
|
|
|
|
|
|
# # Store the users in a temporary attribute that we can access after deletion |
|
|
|
|
# instance._users_to_revoke = list(instance.shared_with.all()) |
|
|
|
|
|
|
|
|
|
# @receiver(post_delete, sender=DataAccess) |
|
|
|
|
# def revoke_access_after_delete(sender, instance, **kwargs): |
|
|
|
|
# # Create revoke logs for all previously stored users |
|
|
|
|
# if hasattr(instance, '_users_to_revoke'): |
|
|
|
|
# for user in instance._users_to_revoke: |
|
|
|
|
# instance.create_access_log(user, 'REVOKE_ACCESS') |
|
|
|
|
instance.create_revoke_access_log() |
|
|
|
|
|
|
|
|
|
# Others |
|
|
|
|
|
|
|
|
|
|