You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

146 lines
5.0 KiB

from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .models import ASSNotification
from django.conf import settings
from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier
from appstoreserverlibrary.api_client import AppStoreServerAPIClient, APIException
from appstoreserverlibrary.models.Environment import Environment
import requests
import json, jwt
import base64
import os
import datetime
# from OpenSSL.crypto import (
# X509Store,
# X509StoreContext,
# X509StoreContextError,
# load_certificate,
# FILETYPE_ASN1,
# FILETYPE_PEM
# )
ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer"
G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer"
root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content
g6_cert_bytes: bytes = requests.get(G6_CER_URL).content
def index(request):
return HttpResponse("Hello, world. You're at the subs index.")
@csrf_exempt
def app_store_webhook_prod(request):
decoded = request.body.decode('utf-8')
fulljson = json.loads(decoded)
signedPayload = fulljson['signedPayload']
decodePayload(signedPayload)
return JsonResponse({'status': 'success'})
@csrf_exempt
def app_store_webhook(request):
decoded = request.body.decode('utf-8')
fulljson = json.loads(decoded)
signedPayload = fulljson['signedPayload']
decodePayload(signedPayload)
return JsonResponse({'status': 'success'})
def decodePayload(signedPayload):
# with open('payload.txt') as f:
# signedPayload = f.read()
enable_online_checks = True
environment = Environment.PRODUCTION
bundle_id = "stax.SlashPoker.nosebleed"
app_apple_id = 1073540690
verifier = SignedDataVerifier([root_cert_bytes, g6_cert_bytes], enable_online_checks, environment, bundle_id, app_apple_id)
try:
payload = verifier.verify_and_decode_notification(signedPayload)
if payload.data:
data = payload.data
transaction_info = verifier.verify_and_decode_signed_transaction(data.signedTransactionInfo)
renewal_info = verifier.verify_and_decode_renewal_info(data.signedRenewalInfo)
signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000)
expiresDateTime = datetime.datetime.fromtimestamp(transaction_info.expiresDate / 1000)
originalPurchaseDateTime = datetime.datetime.fromtimestamp(transaction_info.originalPurchaseDate / 1000)
revocationDateTime = None
if transaction_info.revocationDate:
revocationDateTime = datetime.datetime.fromtimestamp(transaction_info.revocationDate / 1000)
notification = ASSNotification(
notificationType=payload.notificationType,
subtype=payload.subtype,
notificationUUID=payload.notificationUUID,
signedDate=signedDateTime,
appAccountToken=transaction_info.appAccountToken,
productId=transaction_info.productId,
currency=transaction_info.currency,
expiresDate=expiresDateTime,
isUpgraded=transaction_info.isUpgraded,
originalPurchaseDate=originalPurchaseDateTime,
originalTransactionId=transaction_info.originalTransactionId,
price=transaction_info.price,
quantity=transaction_info.quantity,
revocationDate=revocationDateTime,
storefront=transaction_info.storefront,
transactionId=transaction_info.transactionId,
transactionReason=transaction_info.rawTransactionReason,
)
notification.save()
elif payload.summary:
summary = payload.summary
signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000)
notification = ASSNotification(
notificationType=payload.notificationType,
subtype=payload.subtype,
notificationUUID=payload.notificationUUID,
signedDate=signedDateTime,
productId=summary.productId,
requestIdentifier=summary.requestIdentifier,
succeededCount=summary.succeededCount,
failedCount=summary.failedCount,
)
notification.save()
except APIException as e:
print(e)
return HttpResponse(renewal_info)
# def load_root_certificate():
# in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary
# data = in_file.read() # if you only wanted to read 512 bytes, do .read(512)
# in_file.close()
# return load_certificate(FILETYPE_ASN1, data)
def _get_root_cert(root_cert_path):
fn = os.environ.get("APPLE_ROOT_CA")
if fn is None:
fn = root_cert_path or "AppleRootCA-G3.cer"
fn = os.path.expanduser(fn)
with open(fn, "rb") as f:
data = f.read()
root_cert = load_certificate(FILETYPE_ASN1, data)
return root_cert