You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/sync/views.py

496 lines
21 KiB

# from django.shortcuts import render
import re
import json
from rest_framework import viewsets
from rest_framework.views import APIView
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import status
from django.conf import settings
from django.utils import timezone
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q
from collections import defaultdict
from urllib.parse import unquote
from .serializers import DataAccessSerializer
from .utils import generate_random_id, get_serializer, build_serializer_class, get_data, get_serialized_data_by_id, HierarchyOrganizer
from .models import ModelLog, BaseModel, SideStoreModel, DataAccess
from .registry import model_registry, device_registry
from .ws_sender import websocket_sender
import logging
logger = logging.getLogger(__name__)
# class HierarchyApiView(APIView):
def instances_to_dict(instances, models_dict):
for instance in instances:
child_model_name = instance.__class__.__name__
serializer = get_serializer(instance, child_model_name)
if child_model_name not in models_dict:
models_dict[child_model_name] = {}
models_dict[child_model_name][instance.id] = serializer.data
return models_dict
def instances_to_data_identifier_dict(instances, models_dict):
for instance in instances:
if isinstance(instance, BaseModel):
child_model_name = instance.__class__.__name__
if child_model_name not in models_dict:
models_dict[child_model_name] = {}
models_dict[child_model_name][instance.id] = instance.data_identifier_dict()
return models_dict
def add_children_hierarchy(instance, models_dict):
sync_models = getattr(settings, 'SYNC_MODEL_CHILDREN_SHARING', {})
if instance.__class__.__name__ in sync_models:
relationships = sync_models[instance.__class__.__name__]
# 'Match': {'team_scores', 'team_registration', 'player_registrations'}
# print(f'relationships = {relationships}')
current = [instance]
for relationship in relationships:
print(f'> relationship = {relationship}')
values = []
for item in current:
value = getattr(item, relationship)
if hasattr(value, 'all') and callable(value.all):
# This is a queryset from a reverse relationship
for related_obj in value.all():
child_model_name = related_obj.__class__.__name__
serializer = get_serializer(related_obj, child_model_name)
models_dict[child_model_name][related_obj.id] = serializer.data
# print(f'>>> 1 Added child for {relationship}: {child_model_name}')
values.extend(value.all())
else:
# This is a single object
child_model_name = value.__class__.__name__
serializer = get_serializer(value, child_model_name)
models_dict[child_model_name][value.id] = serializer.data
# print(f'>>> 2 Added child for {relationship}: {child_model_name}')
values.append(value)
current = values
else:
add_children_recursively(instance, models_dict)
def add_children_recursively(instance, models_dict):
"""
Recursively add all children of an instance to the updates dictionary.
"""
child_models = instance.get_children_by_model()
for child_model_name, children in child_models.items():
for child in children:
if isinstance(child, BaseModel):
# print(f'add_children_recursively: {child_model_name}')
serializer = get_serializer(child, child_model_name)
models_dict[child_model_name][child.id] = serializer.data
add_children_recursively(child, models_dict)
# def add_parents_with_hierarchy_organizer(instance, hierarchy_organizer, current_level=0):
# """
# Recursively add all parents of an instance to the hierarchy organizer.
# Parents are added at a higher level than their children.
# """
# parent_models = instance.get_parents_by_model()
# for parent_model_name, parent in parent_models.items():
# if isinstance(parent, BaseModel):
# store_id = None
# if isinstance(parent, SideStoreModel):
# store_id = parent.store_id
# parent_data = {
# 'model_id': parent.id,
# 'store_id': store_id
# }
# # Add parent at the next level
# print(f'*** add parent: {parent_model_name}: {parent.id}')
# hierarchy_organizer.add_item(parent_model_name, parent_data, current_level + 1)
# # Recursively process parent's parents
# add_parents_with_hierarchy_organizer(parent, hierarchy_organizer, current_level + 1)
def add_parents_recursively(instance, dictionary):
"""
Recursively add all parents of an instance to the updates dictionary.
"""
parent_models = instance.get_parents_by_model()
for parent_model_name, parent in parent_models.items():
if isinstance(parent, BaseModel):
serializer = get_serializer(parent, parent_model_name)
dictionary[parent_model_name][parent.id] = serializer.data
add_parents_recursively(parent, dictionary)
class SynchronizationApi(APIView):
permission_classes = [IsAuthenticated]
def post(self, request, *args, **kwargs):
device_id = request.data.get('device_id')
operations = request.data['operations']
results = []
# print(f"DataApi post, {len(operations)} operations / device: {device_id}")
models = set()
transaction_id = generate_random_id()
websocket_sender.add_user_transaction(self.request.user.id, transaction_id, device_id)
for op in operations:
result = None
message = None
model_operation = op.get('operation')
model_name = op.get('model_name')
data = op.get('data')
data_id = data.get('id')
device_registry.register(data_id, device_id)
# print(f'*** YEAH: {model_operation} : {model_name}')
try:
# print(f'{model_operation} : {model_name}, id = {data['id']}')
models.add(model_name)
serializer_class = build_serializer_class(model_name)
data['last_updated_by'] = request.user.id # always refresh the user performing the operation
# model = apps.get_model(app_label='tournaments', model_name=model_name)
model = model_registry.get_model(model_name)
if model_operation == 'POST':
if data['related_user'] is None: # affect related_user is necessary
data['related_user'] = request.user.id
serializer = serializer_class(data=data, context={'request': request})
if serializer.is_valid():
instance = serializer.save()
result = serializer.data
response_status = status.HTTP_201_CREATED
else:
print(f'{model_name} POST: Data invalid ! {serializer.errors}')
message = json.dumps(serializer.errors)
response_status = status.HTTP_400_BAD_REQUEST
elif model_operation == 'PUT':
instance = get_data(model_name, data_id)
serializer = serializer_class(instance, data=data, context={'request': request})
if serializer.is_valid():
if instance.last_update <= serializer.validated_data.get('last_update'):
serializer.save()
result = serializer.data
response_status = status.HTTP_200_OK
else:
result = serializer.data
response_status = status.HTTP_203_NON_AUTHORITATIVE_INFORMATION
else:
print(f'{model_name} PUT: Data invalid ! {serializer.errors}')
response_status = status.HTTP_400_BAD_REQUEST
elif model_operation == 'DELETE':
try:
instance = get_data(model_name, data_id)
try:
instance.delete()
response_status = status.HTTP_204_NO_CONTENT
except model.DoesNotExist: # a relationship was not found, not good
response_status = status.HTTP_400_BAD_REQUEST
except model.DoesNotExist: # the data was not found, it's ok
response_status = status.HTTP_208_ALREADY_REPORTED
print(f'{model_name} DoesNotExist, id: {data_id}')
except Exception as e:
response_status = status.HTTP_400_BAD_REQUEST
import traceback
tb = traceback.extract_tb(e.__traceback__)
file_name = tb[-1].filename
line_number = tb[-1].lineno
print(f'OTHER exception on {model_operation} {model_name} : {str(e)}, type: {type(e)}, id: {data.get('id')}, file: {file_name}, line: {line_number}')
message = str(e)
results.append({
'api_call_id': op.get('api_call_id'),
'status': response_status,
'data': result,
'message': message
})
# print(f"sync POST completed for models: {models}")
websocket_sender.remove_user_transaction(self.request.user.id, transaction_id, device_id)
return Response({
'results': results
}, status=207) # Multi-Status
## GET
def get(self, request, *args, **kwargs):
last_update_str = request.query_params.get('last_update')
if not last_update_str:
return Response({"error": "last_update parameter is required"}, status=status.HTTP_400_BAD_REQUEST)
try:
decoded_last_update = unquote(last_update_str) # Decodes %2B into +
last_update = timezone.datetime.fromisoformat(decoded_last_update)
except ValueError:
return Response({"error": f"Invalid date format for last_update: {decoded_last_update}"},
status=status.HTTP_400_BAD_REQUEST)
print(f'>>> {request.user.username} : GET last modifications since: {last_update_str} / converted = {last_update}')
device_id = request.query_params.get('device_id')
logs = self.query_model_logs(last_update, request.user, device_id)
print(f'>>> user = {request.user.username} > log count = {logs.count()}, device_id = {device_id}')
# Process all logs and get response data
result = LogProcessingResult()
result.process_logs(logs)
response_data = result.get_response_data()
return Response(response_data, status=status.HTTP_200_OK)
def query_model_logs(self, last_update, user, device_id):
log_query = Q(date__gt=last_update, user=user)
if device_id:
log_query &= ~Q(device_id=device_id) # exclude query
return ModelLog.objects.filter(log_query).order_by('date')
class LogProcessingResult:
"""Class to handle all log processing and organize the results."""
def __init__(self):
# Initialize all the collections
self.updates = defaultdict(dict)
self.deletions = defaultdict(list)
self.shared_instances = defaultdict(dict) # {model_name: {model_id: instance}}
self.grant_instances = defaultdict(dict) # {model_name: {model_id: instance}}
self.revoke_info = defaultdict(list) # {model_name: [{model_id, store_id}]}
self.shared_relationship_sets = defaultdict(dict)
self.shared_relationship_removals = defaultdict(list)
self.last_log_date = None
def process_logs(self, logs):
"""Process logs to collect basic operations and handle grant/revoke efficiently."""
for log in logs:
self.last_log_date = log.date
try:
if log.operation in ['POST', 'PUT', 'RELATIONSHIP_SET']:
data = get_serialized_data_by_id(log.model_name, log.model_id)
self.updates[log.model_name][log.model_id] = data
elif log.operation in ['DELETE', 'RELATIONSHIP_REMOVED']:
self.deletions[log.model_name].append(log.data_identifier_dict())
elif log.operation == 'SHARED_ACCESS':
# Remove any existing revocations for this model_id
self._remove_revocation(log.model_name, log.model_id)
# Add to grant instances if not already there
if log.model_id not in self.shared_instances[log.model_name]:
model = model_registry.get_model(log.model_name)
try:
instance = model.objects.get(id=log.model_id)
self.shared_instances[log.model_name][log.model_id] = instance
except model.DoesNotExist:
pass
elif log.operation == 'REVOKED_ACCESS':
print(f'revoke access {log.model_id} - {log.store_id}')
# Remove any existing grants for this model_id
self._remove_grant(log.model_name, log.model_id)
# Add to revocations
self.revoke_info[log.model_name].append(log.data_identifier_dict())
# elif log.operation == 'RELATIONSHIP_SET':
# data = get_serialized_data(log.model_name, log.model_id)
# self.relationship_sets[log.model_name][log.model_id] = data
# elif log.operation == 'RELATIONSHIP_REMOVED':
# self.relationship_removals[log.model_name].append(log.data_identifier_dict())
elif log.operation == 'SHARED_RELATIONSHIP_SET':
data = get_serialized_data_by_id(log.model_name, log.model_id)
self.shared_relationship_sets[log.model_name][log.model_id] = data
elif log.operation == 'SHARED_RELATIONSHIP_REMOVED':
self.shared_relationship_removals[log.model_name].append(log.data_identifier_dict())
except ObjectDoesNotExist:
pass
# Convert updates dict to list for each model
for model_name in self.updates:
self.updates[model_name] = list(self.updates[model_name].values())
for model_name in self.shared_relationship_sets:
self.shared_relationship_sets[model_name] = list(self.shared_relationship_sets[model_name].values())
# return self
def _remove_revocation(self, model_name, model_id):
"""Remove any revocation entries for the specified model and ID."""
if model_name in self.revoke_info:
self.revoke_info[model_name] = [
r for r in self.revoke_info[model_name]
if r['model_id'] != model_id
]
# Clean up empty lists
if not self.revoke_info[model_name]:
del self.revoke_info[model_name]
def _remove_grant(self, model_name, model_id):
"""Remove any grant entries for the specified model and ID."""
if model_name in self.shared_instances and model_id in self.shared_instances[model_name]:
del self.shared_instances[model_name][model_id]
# Clean up empty dictionaries
if not self.shared_instances[model_name]:
del self.shared_instances[model_name]
def process_shared(self):
"""Process grants and their hierarchies."""
shared = defaultdict(dict)
grants = defaultdict(dict)
try:
# Process each grant instance
for model_name, instances in self.shared_instances.items():
for model_id, instance in instances.items():
serializer = get_serializer(instance, model_name)
shared[model_name][model_id] = serializer.data
add_children_hierarchy(instance, grants)
add_parents_recursively(instance, grants)
except Exception as e:
print(f'ERR = {e}')
# Convert to lists
for model_name in shared:
shared[model_name] = list(shared[model_name].values())
for model_name in grants:
grants[model_name] = list(grants[model_name].values())
return shared, grants
def process_revocations(self):
"""Process revocations and their hierarchies."""
revocations = defaultdict(list)
revocated_relations_organizer = HierarchyOrganizer()
# First, collect all revocations
for model_name, items in self.revoke_info.items():
revocations[model_name].extend(items)
# logger.info(f'$$$ process_revocations for {model_name}, items = {len(items)}')
# Process parent hierarchies for each revoked item
model = model_registry.get_model(model_name)
for item in items:
# logger.info(f'$$$ item revoked = {item}')
try:
instance = model.objects.get(id=item['model_id'])
# logger.info(f'$$$ process revoked item parents of {model_name} : {item['model_id']}')
revocated_relations_organizer.add_relations(instance)
except model.DoesNotExist:
pass
children = revocated_relations_organizer.grouped_children()
merged_revocations = merge_dicts(revocations, children)
return merged_revocations, revocated_relations_organizer
def get_response_data(self):
"""Construct the complete response data structure."""
shared, grants = self.process_shared()
revocations, revocated_relations_organizer = self.process_revocations()
# print(f'self.revocations = {dict(revocations)}')
# print(f'self.revocated_relations_organizer = {revocated_relations_organizer.get_organized_data()}')
# print(f'self.deletions = {dict(self.deletions)}')
# print(f'self.shared_relationship_sets = {self.shared_relationship_sets}')
# print(f'self.shared_relationship_removals = {self.shared_relationship_removals}')
# logger.info('--------------------- SYNC')
return {
"updates": dict(self.updates),
"deletions": dict(self.deletions),
"shared": dict(shared),
"grants": dict(grants),
"revocations": dict(revocations),
"revocated_relations": revocated_relations_organizer.get_organized_data(),
"shared_relationship_sets": self.shared_relationship_sets,
"shared_relationship_removals": self.shared_relationship_removals,
"date": self.last_log_date
}
class UserDataAccessApi(APIView):
permission_classes = [IsAuthenticated]
def get(self, request, *args, **kwargs):
try:
# Get all DataAccess objects where the requesting user is in shared_with
data_access_objects = DataAccess.objects.filter(
shared_with=request.user
).prefetch_related('shared_with') # Use prefetch_related for better performance
data_by_model = defaultdict(dict)
print(f'>>> grants = {len(data_access_objects)}')
for data_access in data_access_objects:
try:
model = model_registry.get_model(data_access.model_name)
instance = model.objects.get(id=data_access.model_id)
# Get the base data
serializer = get_serializer(instance, data_access.model_name)
data_by_model[data_access.model_name][data_access.model_id] = serializer.data
# Add parents & children recursively
add_children_recursively(instance, data_by_model)
add_parents_recursively(instance, data_by_model)
except ObjectDoesNotExist:
continue
response_data = {
model_name: list(model_data.values())
for model_name, model_data in data_by_model.items()
}
print(f'response_data = {response_data}')
return Response(response_data, status=status.HTTP_200_OK)
except Exception as e:
return Response({
'status': 'error',
'message': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class DataAccessViewSet(viewsets.ModelViewSet):
queryset = DataAccess.objects.all()
serializer_class = DataAccessSerializer
def get_queryset(self):
if self.request.user:
return self.queryset.filter(Q(related_user=self.request.user) | Q(shared_with__in=[self.request.user]))
return []
def merge_dicts(dict1, dict2):
result = defaultdict(list)
for d in [dict1, dict2]:
for key, value in d.items():
result[key].extend(value)
return dict(result)