import http.client import json import jwt import time import httpx import asyncio # APPLE WARNING: Reuse a connection as long as possible. # In most cases, you can reuse a connection for many hours to days. # If your connection is mostly idle, you may send a HTTP2 PING frame after an hour of inactivity. # Reusing a connection often results in less bandwidth and CPU consumption. # https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns def generate_jwt(key_path, key_id, team_id): with open(key_path, 'r') as key_file: key = key_file.read() headers = { "alg": "ES256", "kid": key_id } payload = { "iss": team_id, "iat": time.time() } token = jwt.encode(payload, key, algorithm="ES256", headers=headers) return token key_path = 'AuthKey_DHJRAU6BCZ.p8' key_id = 'DHJRAU6BCZ' team_id = 'BQ3Y44M3Q6' bundle_id = 'app.padelclub' # device_token = 'user_device_token' message = 'Hello, World!' async def send_push_notification(device_token, message): jwt_token = generate_jwt(key_path, key_id, team_id) # APNs endpoint (use 'api.push.apple.com' for production) # host = 'api.sandbox.push.apple.com' url = f"https://api.sandbox.push.apple.com/3/device/{device_token}" payload = { "aps": { "alert": message, "sound": "default" } } payload_json = json.dumps(payload) # Create the HTTP connection # connection = HTTP20Connection(host, port=443) # Set the headers headers = { "apns-topic": bundle_id, "authorization": f"bearer {jwt_token}", "content-type": "application/json" } async with httpx.AsyncClient(http2=True) as client: response = await client.post(url, json=payload, headers=headers) # Send the notification # connection.request( # "POST", # f"/3/device/{device_token}", # body=payload_json, # headers=headers # ) # response = connection.get_response() # response_data = response.read() if response.status_code == 200: print("Notification sent successfully!") else: print(f"Failed to send notification: {response.status_code} {response.text}")