You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
60 lines
3.0 KiB
60 lines
3.0 KiB
from typing import Any, Dict, List, Optional
|
|
from OpenSSL import crypto
|
|
import jwt
|
|
from jwt.utils import base64url_decode
|
|
import requests
|
|
import logging
|
|
|
|
ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer"
|
|
G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer"
|
|
|
|
def get_validated_jwt_content(apple_jwt: str) -> Optional[Dict[str, Any]]:
|
|
# Fetch the well-known/expected root & intermediate keys from Apple:
|
|
root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content
|
|
root_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, root_cert_bytes)
|
|
g6_cert_bytes: bytes = requests.get(G6_CER_URL).content
|
|
g6_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, g6_cert_bytes)
|
|
|
|
# Get the signing keys out of the JWT header. The header will look like:
|
|
# {"alg": "ES256", "x5c": ["...base64 cert...", "...base64 cert..."]}
|
|
header = jwt.get_unverified_header(apple_jwt)
|
|
alg = header['alg'] # ES256
|
|
provided_certificates: List[crypto.X509] = []
|
|
certificate_names: List[Dict[bytes, bytes]] = []
|
|
for cert_base64 in header['x5c']:
|
|
cert_bytes = base64url_decode(cert_base64)
|
|
another_cert = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_bytes)
|
|
# To see the certificate chain by name, which corresponds to certs you can fetch:
|
|
# https://www.apple.com/certificateauthority/
|
|
#
|
|
# Prints <X509Name object '/CN=Apple Root CA - G3/OU=Apple Certification Authority/O=Apple Inc./C=US'>:
|
|
certificate_names.append(dict(another_cert.get_subject().get_components()))
|
|
provided_certificates.append(another_cert)
|
|
|
|
# Verify that the root & intermediate keys are what we expect from Apple:
|
|
assert certificate_names[-1][b'CN'] == b'Apple Root CA - G3', f'Root cert changed: {certificate_names[-1]}'
|
|
assert certificate_names[-2][b'OU'] == b'G6', f'Intermediate cert changed: {certificate_names[-2]}'
|
|
assert provided_certificates[-2].digest('sha256') == g6_cert.digest('sha256')
|
|
assert provided_certificates[-1].digest('sha256') == root_cert.digest('sha256')
|
|
|
|
# Validate that the cert chain is cryptographically legit:
|
|
store = crypto.X509Store()
|
|
store.add_cert(root_cert)
|
|
store.add_cert(g6_cert)
|
|
for cert in provided_certificates[:-2]:
|
|
try:
|
|
crypto.X509StoreContext(store, cert).verify_certificate()
|
|
except crypto.X509StoreContextError:
|
|
logging.error("Invalid certificate chain in JWT: %s", apple_jwt)
|
|
return None
|
|
store.add_cert(cert)
|
|
|
|
# Now that the cert is validated, we can use it to verify the actual signature
|
|
# of the JWT. PyJWT does not understand this certificate if we pass it in, so
|
|
# we have to get the cryptography library's version of the same key:
|
|
cryptography_version_of_key = provided_certificates[0].get_pubkey().to_cryptography_key()
|
|
try:
|
|
return jwt.decode(apple_jwt, cryptography_version_of_key, algorithms=["ES256"])
|
|
except Exception:
|
|
logging.exception("Problem validating Apple JWT")
|
|
return None
|
|
|