from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.generic import ListView, DetailView from django.db.models import Count, Q, Sum, Case, When, IntegerField from django.conf import settings from django.db.models.functions import TruncMonth from django.shortcuts import render from .models import ASSNotification from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier from appstoreserverlibrary.api_client import AppStoreServerAPIClient, APIException from appstoreserverlibrary.models.Environment import Environment import requests from pokeranalytics_backend.settings import APNS_ENVIRONMENT_SANDBOX import json, jwt import base64 import os import logging import datetime ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer" G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer" root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content g6_cert_bytes: bytes = requests.get(G6_CER_URL).content def index(request): return HttpResponse("Hello, world. You're at the subs index.") @csrf_exempt def test(request): try: # Construct the path to payload.txt at the root of the Django project payload_file_path = os.path.join(settings.BASE_DIR, 'payload.txt') # Read the content of the file with open(payload_file_path, 'r') as file: payload_content = file.read() # Call save_payload_file with the content decodePayload(payload_content) # Return success response return JsonResponse({'status': 'success', 'message': 'Payload processed successfully'}) except FileNotFoundError: return JsonResponse({'status': 'error', 'message': 'payload.txt file not found'}, status=404) except Exception as e: # Log the exception logging.error(f"Error in test endpoint: {str(e)}") return JsonResponse({'status': 'error', 'message': str(e)}, status=500) # @csrf_exempt # def app_store_webhook_prod(request): # decoded = request.body.decode('utf-8') # fulljson = json.loads(decoded) # signedPayload = fulljson['signedPayload'] # decodePayload(signedPayload) # return JsonResponse({'status': 'success'}) @csrf_exempt def app_store_webhook(request): decoded = request.body.decode('utf-8') fulljson = json.loads(decoded) signedPayload = fulljson['signedPayload'] save_payload_file(signedPayload) decodePayload(signedPayload) return JsonResponse({'status': 'success'}) def save_payload_file(signedPayload): tmp_dir = os.path.join(settings.BASE_DIR, 'tmp') # Create the tmp directory if it doesn't exist if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(tmp_dir, f'signedPayload_{timestamp}.txt') with open(filename, 'w') as f: f.write(signedPayload) def decodePayload(signedPayload): enable_online_checks = True environment = Environment.PRODUCTION if APNS_ENVIRONMENT_SANDBOX == True: environment = Environment.SANDBOX logger = logging.debug(f'environment = {environment}') bundle_id = "stax.SlashPoker.nosebleed" app_apple_id = 1073540690 verifier = SignedDataVerifier([root_cert_bytes, g6_cert_bytes], enable_online_checks, environment, bundle_id, app_apple_id) try: payload = verifier.verify_and_decode_notification(signedPayload) if payload.data: data = payload.data transaction_info = verifier.verify_and_decode_signed_transaction(data.signedTransactionInfo) # renewal_info = verifier.verify_and_decode_renewal_info(data.signedRenewalInfo) signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000) expiresDateTime = datetime.datetime.fromtimestamp(transaction_info.expiresDate / 1000) originalPurchaseDateTime = datetime.datetime.fromtimestamp(transaction_info.originalPurchaseDate / 1000) revocationDateTime = None if transaction_info.revocationDate: revocationDateTime = datetime.datetime.fromtimestamp(transaction_info.revocationDate / 1000) offer_type = None if transaction_info.offerType is not None: offer_type = int(transaction_info.offerType) offer_discount_type = None if transaction_info.offerDiscountType is not None: offer_discount_type = transaction_info.offerDiscountType.value notification = ASSNotification( notificationType=payload.notificationType, subtype=payload.subtype, notificationUUID=payload.notificationUUID, signedDate=signedDateTime, appAccountToken=transaction_info.appAccountToken, productId=transaction_info.productId, currency=transaction_info.currency, expiresDate=expiresDateTime, isUpgraded=transaction_info.isUpgraded, originalPurchaseDate=originalPurchaseDateTime, originalTransactionId=transaction_info.originalTransactionId, price=transaction_info.price, quantity=transaction_info.quantity, revocationDate=revocationDateTime, storefront=transaction_info.storefront, transactionId=transaction_info.transactionId, transactionReason=transaction_info.rawTransactionReason, offerDiscountType=offer_discount_type, offerIdentifier=transaction_info.offerIdentifier, offerType=offer_type, ) notification.save() elif payload.summary: summary = payload.summary signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000) notification = ASSNotification( notificationType=payload.notificationType, subtype=payload.subtype, notificationUUID=payload.notificationUUID, signedDate=signedDateTime, productId=summary.productId, requestIdentifier=summary.requestIdentifier, succeededCount=summary.succeededCount, failedCount=summary.failedCount, ) notification.save() except APIException as e: print(e) logger = logging.getLogger('subscriptions') logger.exception("An error occurred during division") return HttpResponse('ok') # def load_root_certificate(): # in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary # data = in_file.read() # if you only wanted to read 512 bytes, do .read(512) # in_file.close() # return load_certificate(FILETYPE_ASN1, data) def _get_root_cert(root_cert_path): fn = os.environ.get("APPLE_ROOT_CA") if fn is None: fn = root_cert_path or "AppleRootCA-G3.cer" fn = os.path.expanduser(fn) with open(fn, "rb") as f: data = f.read() root_cert = load_certificate(FILETYPE_ASN1, data) return root_cert class MonthlyOfferSummaryView(ListView): template_name = 'subscriptions/monthly_summary.html' context_object_name = 'monthly_offers' def get_queryset(self): # Annotate notifications with month, then group by month and offerIdentifier # Only include entries where offerIdentifier is not null monthly_offers = ASSNotification.objects.exclude(offerIdentifier__isnull=True)\ .filter(notificationType__in=['SUBSCRIBED', 'REFUND'])\ .annotate(month=TruncMonth('signedDate'))\ .values('month', 'offerIdentifier')\ .annotate(count=Count('id'))\ .order_by('-month', 'offerIdentifier') # Group by month result = {} for offer in monthly_offers: month = offer['month'] if month not in result: result[month] = [] result[month].append({ 'offerIdentifier': offer['offerIdentifier'], 'count': offer['count'] }) # Convert to list of dicts for template monthly_data = [ {'month': month, 'offers': offers} for month, offers in result.items() ] return sorted(monthly_data, key=lambda x: x['month'], reverse=True) class OfferDetailView(ListView): template_name = 'subscriptions/offer_detail.html' context_object_name = 'notifications' def get_queryset(self): month = self.kwargs.get('month') year = self.kwargs.get('year') offer_id = self.kwargs.get('offer_id') # Get the first day of the month and last day of the month start_date = datetime.date(int(year), int(month), 1) if int(month) == 12: end_date = datetime.date(int(year) + 1, 1, 1) - datetime.timedelta(days=1) else: end_date = datetime.date(int(year), int(month) + 1, 1) - datetime.timedelta(days=1) # Filter notifications return ASSNotification.objects.filter( signedDate__date__gte=start_date, signedDate__date__lte=end_date, offerIdentifier=offer_id, notificationType__in=['SUBSCRIBED', 'REFUND'] ).order_by('signedDate') def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) month = self.kwargs.get('month') year = self.kwargs.get('year') offer_id = self.kwargs.get('offer_id') # Get the first day of the month and last day of the month start_date = datetime.date(int(year), int(month), 1) if int(month) == 12: end_date = datetime.date(int(year) + 1, 1, 1) - datetime.timedelta(days=1) else: end_date = datetime.date(int(year), int(month) + 1, 1) - datetime.timedelta(days=1) # Count statistics stats = ASSNotification.objects.filter( signedDate__date__gte=start_date, signedDate__date__lte=end_date, offerIdentifier=offer_id, notificationType__in=['SUBSCRIBED', 'REFUND'] ).aggregate( subscribed_count=Count(Case( When(notificationType='SUBSCRIBED', then=1), output_field=IntegerField() )), refund_count=Count(Case( When(notificationType='REFUND', then=1), output_field=IntegerField() )) ) subscribed_count = stats['subscribed_count'] or 0 refund_count = stats['refund_count'] or 0 net_count = subscribed_count - refund_count revenue = net_count * 15 # $15 per net subscription context.update({ 'month_name': start_date.strftime('%B %Y'), 'offer_id': offer_id, 'subscribed_count': subscribed_count, 'refund_count': refund_count, 'net_count': net_count, 'revenue': revenue }) return context