from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from .models import ASSNotification from django.conf import settings from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier from appstoreserverlibrary.api_client import AppStoreServerAPIClient, APIException from appstoreserverlibrary.models.Environment import Environment import requests from pokeranalytics_backend.settings import APNS_ENVIRONMENT_SANDBOX import json, jwt import base64 import os import logging import datetime ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer" G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer" root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content g6_cert_bytes: bytes = requests.get(G6_CER_URL).content def index(request): return HttpResponse("Hello, world. You're at the subs index.") # @csrf_exempt # def app_store_webhook_prod(request): # decoded = request.body.decode('utf-8') # fulljson = json.loads(decoded) # signedPayload = fulljson['signedPayload'] # decodePayload(signedPayload) # return JsonResponse({'status': 'success'}) @csrf_exempt def app_store_webhook(request): decoded = request.body.decode('utf-8') fulljson = json.loads(decoded) signedPayload = fulljson['signedPayload'] save_payload_file(signedPayload) decodePayload(signedPayload) return JsonResponse({'status': 'success'}) def save_payload_file(signedPayload): tmp_dir = os.path.join(settings.BASE_DIR, 'tmp') # Create the tmp directory if it doesn't exist if not os.path.exists(tmp_dir): os.makedirs(tmp_dir) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(tmp_dir, f'signedPayload_{timestamp}.txt') with open(filename, 'w') as f: f.write(signedPayload) def decodePayload(signedPayload): enable_online_checks = True environment = Environment.PRODUCTION if APNS_ENVIRONMENT_SANDBOX == True: environment = Environment.SANDBOX bundle_id = "stax.SlashPoker.nosebleed" app_apple_id = 1073540690 verifier = SignedDataVerifier([root_cert_bytes, g6_cert_bytes], enable_online_checks, environment, bundle_id, app_apple_id) try: payload = verifier.verify_and_decode_notification(signedPayload) if payload.data: data = payload.data transaction_info = verifier.verify_and_decode_signed_transaction(data.signedTransactionInfo) # renewal_info = verifier.verify_and_decode_renewal_info(data.signedRenewalInfo) signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000) expiresDateTime = datetime.datetime.fromtimestamp(transaction_info.expiresDate / 1000) originalPurchaseDateTime = datetime.datetime.fromtimestamp(transaction_info.originalPurchaseDate / 1000) revocationDateTime = None if transaction_info.revocationDate: revocationDateTime = datetime.datetime.fromtimestamp(transaction_info.revocationDate / 1000) offer_type = None if payload.offerType is not None: offer_type = int(payload.offerType) offer_discount_type = None if payload.offerDiscountType is not None: offer_discount_type = payload.offerDiscountType.value notification = ASSNotification( notificationType=payload.notificationType, subtype=payload.subtype, notificationUUID=payload.notificationUUID, signedDate=signedDateTime, appAccountToken=transaction_info.appAccountToken, productId=transaction_info.productId, currency=transaction_info.currency, expiresDate=expiresDateTime, isUpgraded=transaction_info.isUpgraded, originalPurchaseDate=originalPurchaseDateTime, originalTransactionId=transaction_info.originalTransactionId, price=transaction_info.price, quantity=transaction_info.quantity, revocationDate=revocationDateTime, storefront=transaction_info.storefront, transactionId=transaction_info.transactionId, transactionReason=transaction_info.rawTransactionReason, offerDiscountType=offer_discount_type, offerIdentifier=payload.offerIdentifier, offerType=offer_type, ) notification.save() elif payload.summary: summary = payload.summary signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000) notification = ASSNotification( notificationType=payload.notificationType, subtype=payload.subtype, notificationUUID=payload.notificationUUID, signedDate=signedDateTime, productId=summary.productId, requestIdentifier=summary.requestIdentifier, succeededCount=summary.succeededCount, failedCount=summary.failedCount, ) notification.save() except APIException as e: print(e) logger = logging.getLogger('subscriptions') logger.exception("An error occurred during division") return HttpResponse('ok') # def load_root_certificate(): # in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary # data = in_file.read() # if you only wanted to read 512 bytes, do .read(512) # in_file.close() # return load_certificate(FILETYPE_ASN1, data) def _get_root_cert(root_cert_path): fn = os.environ.get("APPLE_ROOT_CA") if fn is None: fn = root_cert_path or "AppleRootCA-G3.cer" fn = os.path.expanduser(fn) with open(fn, "rb") as f: data = f.read() root_cert = load_certificate(FILETYPE_ASN1, data) return root_cert