from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from .models import ASSNotification from django.conf import settings from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier from appstoreserverlibrary.api_client import AppStoreServerAPIClient, APIException from appstoreserverlibrary.models.Environment import Environment import requests import json, jwt import base64 import os from OpenSSL.crypto import ( X509Store, X509StoreContext, X509StoreContextError, load_certificate, FILETYPE_ASN1, FILETYPE_PEM ) ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer" G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer" root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content g6_cert_bytes: bytes = requests.get(G6_CER_URL).content # import app_store_notifications_v2_validator as asn2 # from app_store_notifications_v2_validator import InvalidTokenError def index(request): return HttpResponse("Hello, world. You're at the subs index.") def test(request): in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary data = in_file.read() # if you only wanted to read 512 bytes, do .read(512) in_file.close() string = data.decode('utf-8') return HttpResponse(string) @csrf_exempt def app_store_webhook_prod(request): decoded = request.body.decode('utf-8') notification = ASSNotification( content=decoded, ) notification.save() return JsonResponse({'status': 'success'}) @csrf_exempt def app_store_webhook(request): decoded = request.body.decode('utf-8') fulljson = json.loads(decoded) signedPayload = fulljson['signedPayload'] # client = AppStoreServerAPIClient(private_key_bytes, key_id, issuer_id, bundle_id, environment) environment = Environment.SANDBOX bundle_id = 'stax.SlashPoker.nosebleed' verifier = SignedDataVerifier([root_cert_bytes, g6_cert_bytes], False, environment, bundle_id) try: # response = client.get_transaction_info() decoded_txn = verifier.verify_and_decode_signed_transaction(signedPayload) print(decoded_txn) type = data['notificationType'] notification = ASSNotification( content=type, ) notification.save() except APIException as e: print(e) # Parse the JSON payload # fulljson = json.loads(decoded) # signedPayload = fulljson['signedPayload'] # # root_certificates = load_root_certificate() # enable_online_checks = True # bundle_id = "stax.SlashPoker.nosebleed" # environment = Environment.SANDBOX # signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id) # # try: # payload = signed_data_verifier.verify_and_decode_notification(signedPayload) # print(payload) # # notification = ASSNotification( # content=payload, # ) # notification.save() # # # except VerificationException as e: # print(e) # try: # data = asn2.parse(decoded) # type = data['notificationType'] # # notification = ASSNotification( # content=type, # ) # notification.save() # # except InvalidTokenError: # pass # KEY_FILE = settings.ASS_KEY_FILE # # with open(KEY_FILE,'r') as key_file: # key = ''.join(key_file.readlines()) # decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256']) # decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256']) #print('hell yeah!' + str(key)) #logger.debug('test getLogger' + str(key)) return JsonResponse({'status': 'success'}) # def _decode_jws(token, root_cert_path, algorithms): # try: # header = jwt.get_unverified_header(token) # # # the first cert contains the public key used to sign the jwt # first_cert_data = header["x5c"][0] # first_cert_data = add_labels(first_cert_data) # first_cert = load_certificate(FILETYPE_PEM, first_cert_data) # # # the other certs are an x5c (X.509 certificate chain) # chain_datas = header["x5c"][1:] # chain_datas = [add_labels(cd) for cd in chain_datas] # chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] # # public_key = first_cert.get_pubkey().to_cryptography_key() # # store = X509Store() # store.add_cert(_get_root_cert(root_cert_path)) # ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) # ctx.verify_certificate() # # return jwt.decode(token, public_key, algorithms=algorithms) # except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: # raise InvalidTokenError from err # def load_root_certificate(): in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary data = in_file.read() # if you only wanted to read 512 bytes, do .read(512) in_file.close() return load_certificate(FILETYPE_ASN1, data) def _get_root_cert(root_cert_path): fn = os.environ.get("APPLE_ROOT_CA") if fn is None: fn = root_cert_path or "AppleRootCA-G3.cer" fn = os.path.expanduser(fn) with open(fn, "rb") as f: data = f.read() root_cert = load_certificate(FILETYPE_ASN1, data) return root_cert # # class InvalidTokenError(Exception): # pass # # def add_labels(key: str) -> bytes: # return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()