try another way

main
Laurent 2 years ago
parent edd9591db7
commit 2a9050324b
  1. 60
      subscriptions/validate_apple_storekit_2_jwt.py
  2. 60
      subscriptions/views.py

@ -0,0 +1,60 @@
from typing import Any, Dict, List, Optional
from OpenSSL import crypto
import jwt
from jwt.utils import base64url_decode
import requests
import logging
ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer"
G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer"
def get_validated_jwt_content(apple_jwt: str) -> Optional[Dict[str, Any]]:
# Fetch the well-known/expected root & intermediate keys from Apple:
root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content
root_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, root_cert_bytes)
g6_cert_bytes: bytes = requests.get(G6_CER_URL).content
g6_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, g6_cert_bytes)
# Get the signing keys out of the JWT header. The header will look like:
# {"alg": "ES256", "x5c": ["...base64 cert...", "...base64 cert..."]}
header = jwt.get_unverified_header(apple_jwt)
alg = header['alg'] # ES256
provided_certificates: List[crypto.X509] = []
certificate_names: List[Dict[bytes, bytes]] = []
for cert_base64 in header['x5c']:
cert_bytes = base64url_decode(cert_base64)
another_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_bytes)
# To see the certificate chain by name, which corresponds to certs you can fetch:
# https://www.apple.com/certificateauthority/
#
# Prints <X509Name object '/CN=Apple Root CA - G3/OU=Apple Certification Authority/O=Apple Inc./C=US'>:
certificate_names.append(dict(another_cert.get_subject().get_components()))
provided_certificates.append(another_cert)
# Verify that the root & intermediate keys are what we expect from Apple:
assert certificate_names[-1][b'CN'] == b'Apple Root CA - G3', f'Root cert changed: {certificate_names[-1]}'
assert certificate_names[-2][b'OU'] == b'G6', f'Intermediate cert changed: {certificate_names[-2]}'
assert provided_certificates[-2].digest('sha256') == g6_cert.digest('sha256')
assert provided_certificates[-1].digest('sha256') == root_cert.digest('sha256')
# Validate that the cert chain is cryptographically legit:
store = crypto.X509Store()
store.add_cert(root_cert)
store.add_cert(g6_cert)
for cert in provided_certificates[:-2]:
try:
crypto.X509StoreContext(store, cert).verify_certificate()
except crypto.X509StoreContextError:
logging.error("Invalid certificate chain in JWT: %s", apple_jwt)
return None
store.add_cert(cert)
# Now that the cert is validated, we can use it to verify the actual signature
# of the JWT. PyJWT does not understand this certificate if we pass it in, so
# we have to get the cryptography library's version of the same key:
cryptography_version_of_key = provided_certificates[0].get_pubkey().to_cryptography_key()
try:
return jwt.decode(apple_jwt, cryptography_version_of_key, algorithms=["ES256"])
except Exception:
logging.exception("Problem validating Apple JWT")
return None

@ -20,6 +20,8 @@ from OpenSSL.crypto import (
FILETYPE_PEM FILETYPE_PEM
) )
from .validate_apple_storekit_2_jwt import get_validated_jwt_content
# import app_store_notifications_v2_validator as asn2 # import app_store_notifications_v2_validator as asn2
# from app_store_notifications_v2_validator import InvalidTokenError # from app_store_notifications_v2_validator import InvalidTokenError
@ -27,39 +29,51 @@ def index(request):
return HttpResponse("Hello, world. You're at the subs index.") return HttpResponse("Hello, world. You're at the subs index.")
def test(request): def test(request):
KEY_FILE = settings.ASS_KEY_FILE in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary
data = in_file.read() # if you only wanted to read 512 bytes, do .read(512)
in_file.close()
string = data.decode('utf-8')
return HttpResponse(string)
with open(KEY_FILE,'r') as key_file:
key = ''.join(key_file.readlines())
return HttpResponse(key)
@csrf_exempt @csrf_exempt
def app_store_webhook(request): def app_store_webhook(request):
decoded = request.body.decode('utf-8') decoded = request.body.decode('utf-8')
# Parse the JSON payload json = get_validated_jwt_content(decoded)
fulljson = json.loads(decoded)
signedPayload = fulljson['signedPayload']
root_certificates = load_root_certificate() type = json['notificationType']
enable_online_checks = True
bundle_id = "stax.SlashPoker.nosebleed"
environment = Environment.SANDBOX
signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id)
try: notification = ASSNotification(
signed_notification = "ey.." content=type,
payload = signed_data_verifier.verify_and_decode_notification(signedPayload) )
print(payload) notification.save()
notification = ASSNotification(
content=payload,
)
notification.save()
# Parse the JSON payload
except VerificationException as e: # fulljson = json.loads(decoded)
print(e) # signedPayload = fulljson['signedPayload']
#
# root_certificates = load_root_certificate()
# enable_online_checks = True
# bundle_id = "stax.SlashPoker.nosebleed"
# environment = Environment.SANDBOX
# signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id)
#
# try:
# payload = signed_data_verifier.verify_and_decode_notification(signedPayload)
# print(payload)
#
# notification = ASSNotification(
# content=payload,
# )
# notification.save()
#
#
# except VerificationException as e:
# print(e)

Loading…
Cancel
Save