diff --git a/subscriptions/validate_apple_storekit_2_jwt.py b/subscriptions/validate_apple_storekit_2_jwt.py new file mode 100644 index 0000000..df00ab7 --- /dev/null +++ b/subscriptions/validate_apple_storekit_2_jwt.py @@ -0,0 +1,60 @@ +from typing import Any, Dict, List, Optional +from OpenSSL import crypto +import jwt +from jwt.utils import base64url_decode +import requests +import logging + +ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer" +G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer" + +def get_validated_jwt_content(apple_jwt: str) -> Optional[Dict[str, Any]]: + # Fetch the well-known/expected root & intermediate keys from Apple: + root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content + root_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, root_cert_bytes) + g6_cert_bytes: bytes = requests.get(G6_CER_URL).content + g6_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, g6_cert_bytes) + + # Get the signing keys out of the JWT header. The header will look like: + # {"alg": "ES256", "x5c": ["...base64 cert...", "...base64 cert..."]} + header = jwt.get_unverified_header(apple_jwt) + alg = header['alg'] # ES256 + provided_certificates: List[crypto.X509] = [] + certificate_names: List[Dict[bytes, bytes]] = [] + for cert_base64 in header['x5c']: + cert_bytes = base64url_decode(cert_base64) + another_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_bytes) + # To see the certificate chain by name, which corresponds to certs you can fetch: + # https://www.apple.com/certificateauthority/ + # + # Prints : + certificate_names.append(dict(another_cert.get_subject().get_components())) + provided_certificates.append(another_cert) + + # Verify that the root & intermediate keys are what we expect from Apple: + assert certificate_names[-1][b'CN'] == b'Apple Root CA - G3', f'Root cert changed: {certificate_names[-1]}' + assert certificate_names[-2][b'OU'] == b'G6', f'Intermediate cert changed: {certificate_names[-2]}' + assert provided_certificates[-2].digest('sha256') == g6_cert.digest('sha256') + assert provided_certificates[-1].digest('sha256') == root_cert.digest('sha256') + + # Validate that the cert chain is cryptographically legit: + store = crypto.X509Store() + store.add_cert(root_cert) + store.add_cert(g6_cert) + for cert in provided_certificates[:-2]: + try: + crypto.X509StoreContext(store, cert).verify_certificate() + except crypto.X509StoreContextError: + logging.error("Invalid certificate chain in JWT: %s", apple_jwt) + return None + store.add_cert(cert) + + # Now that the cert is validated, we can use it to verify the actual signature + # of the JWT. PyJWT does not understand this certificate if we pass it in, so + # we have to get the cryptography library's version of the same key: + cryptography_version_of_key = provided_certificates[0].get_pubkey().to_cryptography_key() + try: + return jwt.decode(apple_jwt, cryptography_version_of_key, algorithms=["ES256"]) + except Exception: + logging.exception("Problem validating Apple JWT") + return None diff --git a/subscriptions/views.py b/subscriptions/views.py index 84e7784..798d2c6 100644 --- a/subscriptions/views.py +++ b/subscriptions/views.py @@ -20,6 +20,8 @@ from OpenSSL.crypto import ( FILETYPE_PEM ) +from .validate_apple_storekit_2_jwt import get_validated_jwt_content + # import app_store_notifications_v2_validator as asn2 # from app_store_notifications_v2_validator import InvalidTokenError @@ -27,39 +29,51 @@ def index(request): return HttpResponse("Hello, world. You're at the subs index.") def test(request): - KEY_FILE = settings.ASS_KEY_FILE + in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary + data = in_file.read() # if you only wanted to read 512 bytes, do .read(512) + in_file.close() + + string = data.decode('utf-8') + + return HttpResponse(string) - with open(KEY_FILE,'r') as key_file: - key = ''.join(key_file.readlines()) - return HttpResponse(key) @csrf_exempt def app_store_webhook(request): decoded = request.body.decode('utf-8') - # Parse the JSON payload - fulljson = json.loads(decoded) - signedPayload = fulljson['signedPayload'] + json = get_validated_jwt_content(decoded) - root_certificates = load_root_certificate() - enable_online_checks = True - bundle_id = "stax.SlashPoker.nosebleed" - environment = Environment.SANDBOX - signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id) + type = json['notificationType'] - try: - signed_notification = "ey.." - payload = signed_data_verifier.verify_and_decode_notification(signedPayload) - print(payload) + notification = ASSNotification( + content=type, + ) + notification.save() - notification = ASSNotification( - content=payload, - ) - notification.save() - - except VerificationException as e: - print(e) + # Parse the JSON payload + # fulljson = json.loads(decoded) + # signedPayload = fulljson['signedPayload'] + # + # root_certificates = load_root_certificate() + # enable_online_checks = True + # bundle_id = "stax.SlashPoker.nosebleed" + # environment = Environment.SANDBOX + # signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id) + # + # try: + # payload = signed_data_verifier.verify_and_decode_notification(signedPayload) + # print(payload) + # + # notification = ASSNotification( + # content=payload, + # ) + # notification.save() + # + # + # except VerificationException as e: + # print(e)