You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

301 lines
11 KiB

from django.shortcuts import render
from django.http import HttpResponse, JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.generic import ListView, DetailView
from django.db.models import Count, Q, Sum, Case, When, IntegerField
from django.conf import settings
from django.db.models.functions import TruncMonth
from django.shortcuts import render
from .models import ASSNotification
from appstoreserverlibrary.signed_data_verifier import VerificationException, SignedDataVerifier
from appstoreserverlibrary.api_client import AppStoreServerAPIClient, APIException
from appstoreserverlibrary.models.Environment import Environment
import requests
from pokeranalytics_backend.settings import APNS_ENVIRONMENT_SANDBOX
import json, jwt
import base64
import os
import logging
import datetime
ROOT_CER_URL = "https://www.apple.com/certificateauthority/AppleRootCA-G3.cer"
G6_CER_URL = "https://www.apple.com/certificateauthority/AppleWWDRCAG6.cer"
root_cert_bytes: bytes = requests.get(ROOT_CER_URL).content
g6_cert_bytes: bytes = requests.get(G6_CER_URL).content
def index(request):
return HttpResponse("Hello, world. You're at the subs index.")
@csrf_exempt
def test(request):
try:
# Construct the path to payload.txt at the root of the Django project
payload_file_path = os.path.join(settings.BASE_DIR, 'payload.txt')
# Read the content of the file
with open(payload_file_path, 'r') as file:
payload_content = file.read()
# Call save_payload_file with the content
decodePayload(payload_content)
# Return success response
return JsonResponse({'status': 'success', 'message': 'Payload processed successfully'})
except FileNotFoundError:
return JsonResponse({'status': 'error', 'message': 'payload.txt file not found'}, status=404)
except Exception as e:
# Log the exception
logging.error(f"Error in test endpoint: {str(e)}")
return JsonResponse({'status': 'error', 'message': str(e)}, status=500)
# @csrf_exempt
# def app_store_webhook_prod(request):
# decoded = request.body.decode('utf-8')
# fulljson = json.loads(decoded)
# signedPayload = fulljson['signedPayload']
# decodePayload(signedPayload)
# return JsonResponse({'status': 'success'})
@csrf_exempt
def app_store_webhook(request):
decoded = request.body.decode('utf-8')
fulljson = json.loads(decoded)
signedPayload = fulljson['signedPayload']
save_payload_file(signedPayload)
decodePayload(signedPayload)
return JsonResponse({'status': 'success'})
def save_payload_file(signedPayload):
tmp_dir = os.path.join(settings.BASE_DIR, 'tmp')
# Create the tmp directory if it doesn't exist
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(tmp_dir, f'signedPayload_{timestamp}.txt')
with open(filename, 'w') as f:
f.write(signedPayload)
def decodePayload(signedPayload):
enable_online_checks = True
environment = Environment.PRODUCTION
if APNS_ENVIRONMENT_SANDBOX == True:
environment = Environment.SANDBOX
logger = logging.debug(f'environment = {environment}')
bundle_id = "stax.SlashPoker.nosebleed"
app_apple_id = 1073540690
verifier = SignedDataVerifier([root_cert_bytes, g6_cert_bytes], enable_online_checks, environment, bundle_id, app_apple_id)
try:
payload = verifier.verify_and_decode_notification(signedPayload)
if payload.data:
data = payload.data
transaction_info = verifier.verify_and_decode_signed_transaction(data.signedTransactionInfo)
# renewal_info = verifier.verify_and_decode_renewal_info(data.signedRenewalInfo)
signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000)
expiresDateTime = datetime.datetime.fromtimestamp(transaction_info.expiresDate / 1000)
originalPurchaseDateTime = datetime.datetime.fromtimestamp(transaction_info.originalPurchaseDate / 1000)
revocationDateTime = None
if transaction_info.revocationDate:
revocationDateTime = datetime.datetime.fromtimestamp(transaction_info.revocationDate / 1000)
offer_type = None
if transaction_info.offerType is not None:
offer_type = int(transaction_info.offerType)
offer_discount_type = None
if transaction_info.offerDiscountType is not None:
offer_discount_type = transaction_info.offerDiscountType.value
notification = ASSNotification(
notificationType=payload.notificationType,
subtype=payload.subtype,
notificationUUID=payload.notificationUUID,
signedDate=signedDateTime,
appAccountToken=transaction_info.appAccountToken,
productId=transaction_info.productId,
currency=transaction_info.currency,
expiresDate=expiresDateTime,
isUpgraded=transaction_info.isUpgraded,
originalPurchaseDate=originalPurchaseDateTime,
originalTransactionId=transaction_info.originalTransactionId,
price=transaction_info.price,
quantity=transaction_info.quantity,
revocationDate=revocationDateTime,
storefront=transaction_info.storefront,
transactionId=transaction_info.transactionId,
transactionReason=transaction_info.rawTransactionReason,
offerDiscountType=offer_discount_type,
offerIdentifier=transaction_info.offerIdentifier,
offerType=offer_type,
)
notification.save()
elif payload.summary:
summary = payload.summary
signedDateTime = datetime.datetime.fromtimestamp(payload.signedDate / 1000)
notification = ASSNotification(
notificationType=payload.notificationType,
subtype=payload.subtype,
notificationUUID=payload.notificationUUID,
signedDate=signedDateTime,
productId=summary.productId,
requestIdentifier=summary.requestIdentifier,
succeededCount=summary.succeededCount,
failedCount=summary.failedCount,
)
notification.save()
except APIException as e:
print(e)
logger = logging.getLogger('subscriptions')
logger.exception("An error occurred during division")
return HttpResponse('ok')
# def load_root_certificate():
# in_file = open("AppleRootCA-G3.cer", "rb") # opening for [r]eading as [b]inary
# data = in_file.read() # if you only wanted to read 512 bytes, do .read(512)
# in_file.close()
# return load_certificate(FILETYPE_ASN1, data)
def _get_root_cert(root_cert_path):
fn = os.environ.get("APPLE_ROOT_CA")
if fn is None:
fn = root_cert_path or "AppleRootCA-G3.cer"
fn = os.path.expanduser(fn)
with open(fn, "rb") as f:
data = f.read()
root_cert = load_certificate(FILETYPE_ASN1, data)
return root_cert
class MonthlyOfferSummaryView(ListView):
template_name = 'subscriptions/monthly_summary.html'
context_object_name = 'monthly_offers'
def get_queryset(self):
# Annotate notifications with month, then group by month and offerIdentifier
# Only include entries where offerIdentifier is not null
monthly_offers = ASSNotification.objects.exclude(offerIdentifier__isnull=True)\
.filter(notificationType__in=['SUBSCRIBED', 'REFUND'])\
.annotate(month=TruncMonth('signedDate'))\
.values('month', 'offerIdentifier')\
.annotate(count=Count('id'))\
.order_by('-month', 'offerIdentifier')
# Group by month
result = {}
for offer in monthly_offers:
month = offer['month']
if month not in result:
result[month] = []
result[month].append({
'offerIdentifier': offer['offerIdentifier'],
'count': offer['count']
})
# Convert to list of dicts for template
monthly_data = [
{'month': month, 'offers': offers}
for month, offers in result.items()
]
return sorted(monthly_data, key=lambda x: x['month'], reverse=True)
class OfferDetailView(ListView):
template_name = 'subscriptions/offer_detail.html'
context_object_name = 'notifications'
def get_queryset(self):
month = self.kwargs.get('month')
year = self.kwargs.get('year')
offer_id = self.kwargs.get('offer_id')
# Get the first day of the month and last day of the month
start_date = datetime.date(int(year), int(month), 1)
if int(month) == 12:
end_date = datetime.date(int(year) + 1, 1, 1) - datetime.timedelta(days=1)
else:
end_date = datetime.date(int(year), int(month) + 1, 1) - datetime.timedelta(days=1)
# Filter notifications
return ASSNotification.objects.filter(
signedDate__date__gte=start_date,
signedDate__date__lte=end_date,
offerIdentifier=offer_id,
notificationType__in=['SUBSCRIBED', 'REFUND']
).order_by('signedDate')
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
month = self.kwargs.get('month')
year = self.kwargs.get('year')
offer_id = self.kwargs.get('offer_id')
# Get the first day of the month and last day of the month
start_date = datetime.date(int(year), int(month), 1)
if int(month) == 12:
end_date = datetime.date(int(year) + 1, 1, 1) - datetime.timedelta(days=1)
else:
end_date = datetime.date(int(year), int(month) + 1, 1) - datetime.timedelta(days=1)
# Count statistics
stats = ASSNotification.objects.filter(
signedDate__date__gte=start_date,
signedDate__date__lte=end_date,
offerIdentifier=offer_id,
notificationType__in=['SUBSCRIBED', 'REFUND']
).aggregate(
subscribed_count=Count(Case(
When(notificationType='SUBSCRIBED', then=1),
output_field=IntegerField()
)),
refund_count=Count(Case(
When(notificationType='REFUND', then=1),
output_field=IntegerField()
))
)
subscribed_count = stats['subscribed_count'] or 0
refund_count = stats['refund_count'] or 0
net_count = subscribed_count - refund_count
revenue = net_count * 15 # $15 per net subscription
context.update({
'month_name': start_date.strftime('%B %Y'),
'offer_id': offer_id,
'subscribed_count': subscribed_count,
'refund_count': refund_count,
'net_count': net_count,
'revenue': revenue
})
return context