From edf833b4e3e3d2ddca295e963056daabdc42939d Mon Sep 17 00:00:00 2001 From: Laurent Date: Mon, 15 Jan 2024 16:50:55 +0100 Subject: [PATCH] another try --- subscriptions/views.py | 125 +++++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 61 deletions(-) diff --git a/subscriptions/views.py b/subscriptions/views.py index c79bad7..202fb5d 100644 --- a/subscriptions/views.py +++ b/subscriptions/views.py @@ -3,18 +3,13 @@ from django.http import HttpResponse, JsonResponse from django.views.decorators.csrf import csrf_exempt from .models import ASSNotification from django.conf import settings -import json, jwt -import base64 -import os - -from OpenSSL.crypto import ( - X509Store, - X509StoreContext, - X509StoreContextError, - load_certificate, - FILETYPE_ASN1, - FILETYPE_PEM -) + +# import json, jwt +# import base64 +# import os + +import app_store_notifications_v2_validator as asn2 + def index(request): return HttpResponse("Hello, world. You're at the subs index.") @@ -32,68 +27,76 @@ def app_store_webhook(request): data = request.body.decode('utf-8') # Parse the JSON payload fulljson = json.loads(data) - signedPayload = fulljson['signedPayload'] + # signedPayload = fulljson['signedPayload'] + + try: + data = asn2.parse(fulljson) + type = data['notificationType'] + + notification = ASSNotification( + content=type, + ) + notification.save() + + except InvalidTokenError: + pass # KEY_FILE = settings.ASS_KEY_FILE # # with open(KEY_FILE,'r') as key_file: # key = ''.join(key_file.readlines()) - decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256']) + # decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256']) # decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256']) #print('hell yeah!' + str(key)) #logger.debug('test getLogger' + str(key)) - notification = ASSNotification( - content=decodedPayload, - ) - notification.save() return JsonResponse({'status': 'success'}) -def _decode_jws(token, root_cert_path, algorithms): - try: - header = jwt.get_unverified_header(token) - - # the first cert contains the public key used to sign the jwt - first_cert_data = header["x5c"][0] - first_cert_data = add_labels(first_cert_data) - first_cert = load_certificate(FILETYPE_PEM, first_cert_data) - - # the other certs are an x5c (X.509 certificate chain) - chain_datas = header["x5c"][1:] - chain_datas = [add_labels(cd) for cd in chain_datas] - chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] - - public_key = first_cert.get_pubkey().to_cryptography_key() - - store = X509Store() - store.add_cert(_get_root_cert(root_cert_path)) - ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) - ctx.verify_certificate() - - return jwt.decode(token, public_key, algorithms=algorithms) - except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: - raise InvalidTokenError from err - -def _get_root_cert(root_cert_path): - - fn = os.environ.get("APPLE_ROOT_CA") - if fn is None: - fn = root_cert_path or "AppleRootCA-G3.cer" - - fn = os.path.expanduser(fn) - with open(fn, "rb") as f: - data = f.read() - root_cert = load_certificate(FILETYPE_ASN1, data) - - return root_cert - -class InvalidTokenError(Exception): - pass - -def add_labels(key: str) -> bytes: - return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode() +# def _decode_jws(token, root_cert_path, algorithms): +# try: +# header = jwt.get_unverified_header(token) +# +# # the first cert contains the public key used to sign the jwt +# first_cert_data = header["x5c"][0] +# first_cert_data = add_labels(first_cert_data) +# first_cert = load_certificate(FILETYPE_PEM, first_cert_data) +# +# # the other certs are an x5c (X.509 certificate chain) +# chain_datas = header["x5c"][1:] +# chain_datas = [add_labels(cd) for cd in chain_datas] +# chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] +# +# public_key = first_cert.get_pubkey().to_cryptography_key() +# +# store = X509Store() +# store.add_cert(_get_root_cert(root_cert_path)) +# ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) +# ctx.verify_certificate() +# +# return jwt.decode(token, public_key, algorithms=algorithms) +# except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: +# raise InvalidTokenError from err +# +# def _get_root_cert(root_cert_path): +# +# fn = os.environ.get("APPLE_ROOT_CA") +# if fn is None: +# fn = root_cert_path or "AppleRootCA-G3.cer" +# +# fn = os.path.expanduser(fn) +# with open(fn, "rb") as f: +# data = f.read() +# root_cert = load_certificate(FILETYPE_ASN1, data) +# +# return root_cert +# +# class InvalidTokenError(Exception): +# pass +# +# def add_labels(key: str) -> bytes: +# return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()