You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
80 lines
2.2 KiB
80 lines
2.2 KiB
import json
|
|
import jwt
|
|
import time
|
|
import httpx
|
|
|
|
# APPLE WARNING: Reuse a connection as long as possible.
|
|
# In most cases, you can reuse a connection for many hours to days.
|
|
# If your connection is mostly idle, you may send a HTTP2 PING frame after an hour of inactivity.
|
|
# Reusing a connection often results in less bandwidth and CPU consumption.
|
|
# https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns
|
|
|
|
def generate_jwt(key_path, key_id, team_id):
|
|
with open(key_path, 'r') as key_file:
|
|
key = key_file.read()
|
|
|
|
headers = {
|
|
"alg": "ES256",
|
|
"kid": key_id
|
|
}
|
|
|
|
payload = {
|
|
"iss": team_id,
|
|
"iat": time.time()
|
|
}
|
|
|
|
token = jwt.encode(payload, key, algorithm="ES256", headers=headers)
|
|
return token
|
|
|
|
|
|
key_path = 'AuthKey_DHJRAU6BCZ.p8'
|
|
key_id = 'DHJRAU6BCZ'
|
|
team_id = 'BQ3Y44M3Q6'
|
|
bundle_id = 'app.padelclub'
|
|
# device_token = 'user_device_token'
|
|
message = 'Hello, World!'
|
|
|
|
async def send_push_notification(device_token, message):
|
|
jwt_token = generate_jwt(key_path, key_id, team_id)
|
|
|
|
# APNs endpoint (use 'api.push.apple.com' for production)
|
|
# host = 'api.sandbox.push.apple.com'
|
|
url = f"https://api.sandbox.push.apple.com/3/device/{device_token}"
|
|
|
|
payload = {
|
|
"aps": {
|
|
"alert": message,
|
|
"sound": "default"
|
|
}
|
|
}
|
|
|
|
payload_json = json.dumps(payload)
|
|
|
|
# Create the HTTP connection
|
|
# connection = HTTP20Connection(host, port=443)
|
|
|
|
# Set the headers
|
|
headers = {
|
|
"apns-topic": bundle_id,
|
|
"authorization": f"bearer {jwt_token}",
|
|
"content-type": "application/json"
|
|
}
|
|
|
|
async with httpx.AsyncClient(http2=True) as client:
|
|
response = await client.post(url, json=payload, headers=headers)
|
|
|
|
# Send the notification
|
|
# connection.request(
|
|
# "POST",
|
|
# f"/3/device/{device_token}",
|
|
# body=payload_json,
|
|
# headers=headers
|
|
# )
|
|
|
|
# response = connection.get_response()
|
|
# response_data = response.read()
|
|
|
|
if response.status_code == 200:
|
|
print("Notification sent successfully!")
|
|
else:
|
|
print(f"Failed to send notification: {response.status_code} {response.text}")
|
|
|