You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

144 lines
4.1 KiB

from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_exempt
from .models import ASSNotification
from django.conf import settings
from appstoreserverlibrary.models.Environment import Environment
from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier
import json, jwt
import base64
import os
from OpenSSL.crypto import (
X509Store,
X509StoreContext,
X509StoreContextError,
load_certificate,
FILETYPE_ASN1,
FILETYPE_PEM
)
# import app_store_notifications_v2_validator as asn2
# from app_store_notifications_v2_validator import InvalidTokenError
def index(request):
return HttpResponse("Hello, world. You're at the subs index.")
def test(request):
KEY_FILE = settings.ASS_KEY_FILE
with open(KEY_FILE,'r') as key_file:
key = ''.join(key_file.readlines())
return HttpResponse(key)
@csrf_exempt
def app_store_webhook(request):
decoded = request.body.decode('utf-8')
# Parse the JSON payload
fulljson = json.loads(decoded)
signedPayload = fulljson['signedPayload']
root_certificates = load_root_certificate()
enable_online_checks = True
bundle_id = "stax.SlashPoker.nosebleed"
environment = Environment.SANDBOX
signed_data_verifier = SignedDataVerifier(root_certificates, enable_online_checks, environment, bundle_id)
try:
signed_notification = "ey.."
payload = signed_data_verifier.verify_and_decode_notification(signedPayload)
print(payload)
notification = ASSNotification(
content=payload,
)
notification.save()
except VerificationException as e:
print(e)
# try:
# data = asn2.parse(decoded)
# type = data['notificationType']
#
# notification = ASSNotification(
# content=type,
# )
# notification.save()
#
# except InvalidTokenError:
# pass
# KEY_FILE = settings.ASS_KEY_FILE
#
# with open(KEY_FILE,'r') as key_file:
# key = ''.join(key_file.readlines())
# decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256'])
# decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256'])
#print('hell yeah!' + str(key))
#logger.debug('test getLogger' + str(key))
return JsonResponse({'status': 'success'})
# def _decode_jws(token, root_cert_path, algorithms):
# try:
# header = jwt.get_unverified_header(token)
#
# # the first cert contains the public key used to sign the jwt
# first_cert_data = header["x5c"][0]
# first_cert_data = add_labels(first_cert_data)
# first_cert = load_certificate(FILETYPE_PEM, first_cert_data)
#
# # the other certs are an x5c (X.509 certificate chain)
# chain_datas = header["x5c"][1:]
# chain_datas = [add_labels(cd) for cd in chain_datas]
# chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas]
#
# public_key = first_cert.get_pubkey().to_cryptography_key()
#
# store = X509Store()
# store.add_cert(_get_root_cert(root_cert_path))
# ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain)
# ctx.verify_certificate()
#
# return jwt.decode(token, public_key, algorithms=algorithms)
# except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err:
# raise InvalidTokenError from err
#
def load_root_certificate():
in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary
data = in_file.read() # if you only wanted to read 512 bytes, do .read(512)
in_file.close()
return data
def _get_root_cert(root_cert_path):
fn = os.environ.get("APPLE_ROOT_CA")
if fn is None:
fn = root_cert_path or "AppleRootCA-G3.cer"
fn = os.path.expanduser(fn)
with open(fn, "rb") as f:
data = f.read()
root_cert = load_certificate(FILETYPE_ASN1, data)
return root_cert
#
# class InvalidTokenError(Exception):
# pass
#
# def add_labels(key: str) -> bytes:
# return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()