from django.shortcuts import render from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from .models import ASSNotification from django.conf import settings import json, jwt import base64 from OpenSSL.crypto import ( X509Store, X509StoreContext, X509StoreContextError, load_certificate, FILETYPE_ASN1, FILETYPE_PEM ) def index(request): return HttpResponse("Hello, world. You're at the subs index.") def test(request): KEY_FILE = settings.ASS_KEY_FILE with open(KEY_FILE,'r') as key_file: key = ''.join(key_file.readlines()) return HttpResponse(key) @csrf_exempt def app_store_webhook(request): data = request.body.decode('utf-8') # Parse the JSON payload fulljson = json.loads(data) signedPayload = fulljson['signedPayload'] # KEY_FILE = settings.ASS_KEY_FILE # # with open(KEY_FILE,'r') as key_file: # key = ''.join(key_file.readlines()) decodedPayload = _decode_jws(signedPayload, algorithms=algorithms) # decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256']) #print('hell yeah!' + str(key)) #logger.debug('test getLogger' + str(key)) notification = ASSNotification( content=decodedPayload, ) notification.save() return JsonResponse({'status': 'success'}) def _decode_jws(token, root_cert_path, algorithms): try: header = jwt.get_unverified_header(token) # the first cert contains the public key used to sign the jwt first_cert_data = header["x5c"][0] first_cert_data = add_labels(first_cert_data) first_cert = load_certificate(FILETYPE_PEM, first_cert_data) # the other certs are an x5c (X.509 certificate chain) chain_datas = header["x5c"][1:] chain_datas = [add_labels(cd) for cd in chain_datas] chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] public_key = first_cert.get_pubkey().to_cryptography_key() store = X509Store() store.add_cert(_get_root_cert(root_cert_path)) ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) ctx.verify_certificate() return jwt.decode(token, public_key, algorithms=algorithms) except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: raise InvalidTokenError from err def add_labels(key: str) -> bytes: return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()