another try

main
Laurent 2 years ago
parent aad7a3c71e
commit edf833b4e3
  1. 125
      subscriptions/views.py

@ -3,18 +3,13 @@ from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from .models import ASSNotification from .models import ASSNotification
from django.conf import settings from django.conf import settings
import json, jwt
import base64 # import json, jwt
import os # import base64
# import os
from OpenSSL.crypto import (
X509Store, import app_store_notifications_v2_validator as asn2
X509StoreContext,
X509StoreContextError,
load_certificate,
FILETYPE_ASN1,
FILETYPE_PEM
)
def index(request): def index(request):
return HttpResponse("Hello, world. You're at the subs index.") return HttpResponse("Hello, world. You're at the subs index.")
@ -32,68 +27,76 @@ def app_store_webhook(request):
data = request.body.decode('utf-8') data = request.body.decode('utf-8')
# Parse the JSON payload # Parse the JSON payload
fulljson = json.loads(data) fulljson = json.loads(data)
signedPayload = fulljson['signedPayload'] # signedPayload = fulljson['signedPayload']
try:
data = asn2.parse(fulljson)
type = data['notificationType']
notification = ASSNotification(
content=type,
)
notification.save()
except InvalidTokenError:
pass
# KEY_FILE = settings.ASS_KEY_FILE # KEY_FILE = settings.ASS_KEY_FILE
# #
# with open(KEY_FILE,'r') as key_file: # with open(KEY_FILE,'r') as key_file:
# key = ''.join(key_file.readlines()) # key = ''.join(key_file.readlines())
decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256']) # decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256'])
# decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256']) # decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256'])
#print('hell yeah!' + str(key)) #print('hell yeah!' + str(key))
#logger.debug('test getLogger' + str(key)) #logger.debug('test getLogger' + str(key))
notification = ASSNotification(
content=decodedPayload,
)
notification.save()
return JsonResponse({'status': 'success'}) return JsonResponse({'status': 'success'})
def _decode_jws(token, root_cert_path, algorithms): # def _decode_jws(token, root_cert_path, algorithms):
try: # try:
header = jwt.get_unverified_header(token) # header = jwt.get_unverified_header(token)
#
# the first cert contains the public key used to sign the jwt # # the first cert contains the public key used to sign the jwt
first_cert_data = header["x5c"][0] # first_cert_data = header["x5c"][0]
first_cert_data = add_labels(first_cert_data) # first_cert_data = add_labels(first_cert_data)
first_cert = load_certificate(FILETYPE_PEM, first_cert_data) # first_cert = load_certificate(FILETYPE_PEM, first_cert_data)
#
# the other certs are an x5c (X.509 certificate chain) # # the other certs are an x5c (X.509 certificate chain)
chain_datas = header["x5c"][1:] # chain_datas = header["x5c"][1:]
chain_datas = [add_labels(cd) for cd in chain_datas] # chain_datas = [add_labels(cd) for cd in chain_datas]
chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas] # chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas]
#
public_key = first_cert.get_pubkey().to_cryptography_key() # public_key = first_cert.get_pubkey().to_cryptography_key()
#
store = X509Store() # store = X509Store()
store.add_cert(_get_root_cert(root_cert_path)) # store.add_cert(_get_root_cert(root_cert_path))
ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain) # ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain)
ctx.verify_certificate() # ctx.verify_certificate()
#
return jwt.decode(token, public_key, algorithms=algorithms) # return jwt.decode(token, public_key, algorithms=algorithms)
except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err: # except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err:
raise InvalidTokenError from err # raise InvalidTokenError from err
#
def _get_root_cert(root_cert_path): # def _get_root_cert(root_cert_path):
#
fn = os.environ.get("APPLE_ROOT_CA") # fn = os.environ.get("APPLE_ROOT_CA")
if fn is None: # if fn is None:
fn = root_cert_path or "AppleRootCA-G3.cer" # fn = root_cert_path or "AppleRootCA-G3.cer"
#
fn = os.path.expanduser(fn) # fn = os.path.expanduser(fn)
with open(fn, "rb") as f: # with open(fn, "rb") as f:
data = f.read() # data = f.read()
root_cert = load_certificate(FILETYPE_ASN1, data) # root_cert = load_certificate(FILETYPE_ASN1, data)
#
return root_cert # return root_cert
#
class InvalidTokenError(Exception): # class InvalidTokenError(Exception):
pass # pass
#
def add_labels(key: str) -> bytes: # def add_labels(key: str) -> bytes:
return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode() # return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()

Loading…
Cancel
Save