from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from .models import ASSNotification from django.conf import settings import json, jwt # import base64 # import os import app_store_notifications_v2_validator as asn2 from app_store_notifications_v2_validator import InvalidTokenError def index(request): return HttpResponse("Hello, world. You're at the subs index.") def test(request): KEY_FILE = settings.ASS_KEY_FILE with open(KEY_FILE,'r') as key_file: key = ''.join(key_file.readlines()) return HttpResponse(key) @csrf_exempt def app_store_webhook(request): decoded = request.body.decode('utf-8') # Parse the JSON payload #fulljson = json.loads(data) # signedPayload = fulljson['signedPayload'] decoded_payload = json.loads(base64.b64decode(data)) # Now you can access the decoded data type = decoded_payload.get('notification_type') notification = ASSNotification( content=type, ) notification.save() # try: # data = asn2.parse(request.body) # type = data['notificationType'] # # notification = ASSNotification( # content=type, # ) # notification.save() # # except InvalidTokenError: # pass # KEY_FILE = settings.ASS_KEY_FILE # # with open(KEY_FILE,'r') as key_file: # key = ''.join(key_file.readlines()) # decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256']) # decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256']) #print('hell yeah!' + str(key)) #logger.debug('test getLogger' + str(key)) return JsonResponse({'status': 'success'}) # def _decode_jws(token, root_cert_path, algorithms): # try: # header = jwt.get_unverified_header(token) # # # the first cert contains the public key used to sign the jwt # first_cert_data = header["x5c"][0] # first_cert_data = add_labels(first_cert_data) # first_cert = load_certificate(FILETYPE_PEM, first_cert_data) # # # the other certs are an x5c (X.509 certificate chain) # chain_datas = header["x5c"][1:] # chain_datas = [add_labels(cd) for cd in chain_datas] # chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] # # public_key = first_cert.get_pubkey().to_cryptography_key() # # store = X509Store() # store.add_cert(_get_root_cert(root_cert_path)) # ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) # ctx.verify_certificate() # # return jwt.decode(token, public_key, algorithms=algorithms) # except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: # raise InvalidTokenError from err # # def _get_root_cert(root_cert_path): # # fn = os.environ.get("APPLE_ROOT_CA") # if fn is None: # fn = root_cert_path or "AppleRootCA-G3.cer" # # fn = os.path.expanduser(fn) # with open(fn, "rb") as f: # data = f.read() # root_cert = load_certificate(FILETYPE_ASN1, data) # # return root_cert # # class InvalidTokenError(Exception): # pass # # def add_labels(key: str) -> bytes: # return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()