from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from .models import ASSNotification from django.conf import settings from appstoreserverlibrary.models.Environment import Environment from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier import json, jwt import base64 import os from OpenSSL.crypto import ( X509Store, X509StoreContext, X509StoreContextError, load_certificate, FILETYPE_ASN1, FILETYPE_PEM ) # import app_store_notifications_v2_validator as asn2 # from app_store_notifications_v2_validator import InvalidTokenError def index(request): return HttpResponse("Hello, world. You're at the subs index.") def test(request): KEY_FILE = settings.ASS_KEY_FILE with open(KEY_FILE,'r') as key_file: key = ''.join(key_file.readlines()) return HttpResponse(key) @csrf_exempt def app_store_webhook(request): decoded = request.body.decode('utf-8') # Parse the JSON payload fulljson = json.loads(decoded) signedPayload = fulljson['signedPayload'] root_certificates = _get_root_cert(None) enable_online_checks = True bundle_id = "stax.SlashPoker.nosebleed" environment = Environment.SANDBOX signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id) try: signed_notification = "ey.." payload = signed_data_verifier.verify_and_decode_notification(signedPayload) print(payload) notification = ASSNotification( content=payload, ) notification.save() except VerificationException as e: print(e) # try: # data = asn2.parse(decoded) # type = data['notificationType'] # # notification = ASSNotification( # content=type, # ) # notification.save() # # except InvalidTokenError: # pass # KEY_FILE = settings.ASS_KEY_FILE # # with open(KEY_FILE,'r') as key_file: # key = ''.join(key_file.readlines()) # decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256']) # decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256']) #print('hell yeah!' + str(key)) #logger.debug('test getLogger' + str(key)) return JsonResponse({'status': 'success'}) # def _decode_jws(token, root_cert_path, algorithms): # try: # header = jwt.get_unverified_header(token) # # # the first cert contains the public key used to sign the jwt # first_cert_data = header["x5c"][0] # first_cert_data = add_labels(first_cert_data) # first_cert = load_certificate(FILETYPE_PEM, first_cert_data) # # # the other certs are an x5c (X.509 certificate chain) # chain_datas = header["x5c"][1:] # chain_datas = [add_labels(cd) for cd in chain_datas] # chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] # # public_key = first_cert.get_pubkey().to_cryptography_key() # # store = X509Store() # store.add_cert(_get_root_cert(root_cert_path)) # ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) # ctx.verify_certificate() # # return jwt.decode(token, public_key, algorithms=algorithms) # except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: # raise InvalidTokenError from err # def _get_root_cert(root_cert_path): fn = os.environ.get("APPLE_ROOT_CA") if fn is None: fn = root_cert_path or "AppleRootCA-G3.cer" fn = os.path.expanduser(fn) with open(fn, "rb") as f: data = f.read() root_cert = load_certificate(FILETYPE_ASN1, data) return root_cert # # class InvalidTokenError(Exception): # pass # # def add_labels(key: str) -> bytes: # return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()