You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
112 lines
3.2 KiB
112 lines
3.2 KiB
from django.shortcuts import render
|
|
from django.http import HttpResponse, JsonResponse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from .models import ASSNotification
|
|
from django.conf import settings
|
|
|
|
import json, jwt
|
|
import base64
|
|
# import os
|
|
|
|
import app_store_notifications_v2_validator as asn2
|
|
from app_store_notifications_v2_validator import InvalidTokenError
|
|
|
|
def index(request):
|
|
return HttpResponse("Hello, world. You're at the subs index.")
|
|
|
|
def test(request):
|
|
KEY_FILE = settings.ASS_KEY_FILE
|
|
|
|
with open(KEY_FILE,'r') as key_file:
|
|
key = ''.join(key_file.readlines())
|
|
return HttpResponse(key)
|
|
|
|
@csrf_exempt
|
|
def app_store_webhook(request):
|
|
|
|
decoded = request.body.decode('utf-8')
|
|
# Parse the JSON payload
|
|
#fulljson = json.loads(data)
|
|
# signedPayload = fulljson['signedPayload']
|
|
|
|
decoded_payload = json.loads(base64.b64decode(decoded))
|
|
|
|
# Now you can access the decoded data
|
|
type = decoded_payload.get('notification_type')
|
|
notification = ASSNotification(
|
|
content=type,
|
|
)
|
|
notification.save()
|
|
|
|
|
|
# try:
|
|
# data = asn2.parse(request.body)
|
|
# type = data['notificationType']
|
|
#
|
|
# notification = ASSNotification(
|
|
# content=type,
|
|
# )
|
|
# notification.save()
|
|
#
|
|
# except InvalidTokenError:
|
|
# pass
|
|
|
|
# KEY_FILE = settings.ASS_KEY_FILE
|
|
#
|
|
# with open(KEY_FILE,'r') as key_file:
|
|
# key = ''.join(key_file.readlines())
|
|
|
|
# decodedPayload = _decode_jws(signedPayload, root_cert_path=None, algorithms=['ES256'])
|
|
|
|
# decodedPayload = jwt.decode(signedPayload, key, algorithms=['ES256'])
|
|
|
|
#print('hell yeah!' + str(key))
|
|
#logger.debug('test getLogger' + str(key))
|
|
|
|
|
|
return JsonResponse({'status': 'success'})
|
|
|
|
|
|
# def _decode_jws(token, root_cert_path, algorithms):
|
|
# try:
|
|
# header = jwt.get_unverified_header(token)
|
|
#
|
|
# # the first cert contains the public key used to sign the jwt
|
|
# first_cert_data = header["x5c"][0]
|
|
# first_cert_data = add_labels(first_cert_data)
|
|
# first_cert = load_certificate(FILETYPE_PEM, first_cert_data)
|
|
#
|
|
# # the other certs are an x5c (X.509 certificate chain)
|
|
# chain_datas = header["x5c"][1:]
|
|
# chain_datas = [add_labels(cd) for cd in chain_datas]
|
|
# chain = [load_certificate(FILETYPE_PEM, cd) for cd in chain_datas]
|
|
#
|
|
# public_key = first_cert.get_pubkey().to_cryptography_key()
|
|
#
|
|
# store = X509Store()
|
|
# store.add_cert(_get_root_cert(root_cert_path))
|
|
# ctx = X509StoreContext(store=store, certificate=first_cert, chain=chain)
|
|
# ctx.verify_certificate()
|
|
#
|
|
# return jwt.decode(token, public_key, algorithms=algorithms)
|
|
# except (ValueError, KeyError, jwt.exceptions.PyJWTError, X509StoreContextError) as err:
|
|
# raise InvalidTokenError from err
|
|
#
|
|
# def _get_root_cert(root_cert_path):
|
|
#
|
|
# fn = os.environ.get("APPLE_ROOT_CA")
|
|
# if fn is None:
|
|
# fn = root_cert_path or "AppleRootCA-G3.cer"
|
|
#
|
|
# fn = os.path.expanduser(fn)
|
|
# with open(fn, "rb") as f:
|
|
# data = f.read()
|
|
# root_cert = load_certificate(FILETYPE_ASN1, data)
|
|
#
|
|
# return root_cert
|
|
#
|
|
# class InvalidTokenError(Exception):
|
|
# pass
|
|
#
|
|
# def add_labels(key: str) -> bytes:
|
|
# return ("-----BEGIN CERTIFICATE-----\n" + key + "\n-----END CERTIFICATE-----").encode()
|
|
|