AWS AppSync Events is a managed serverless WebSocket API for real-time pub/sub. Clients subscribe to channels and receive events as they're published. No connection management, scaling, or WebSocket infrastructure to manage.
Concept | Description
Channel | A path like /chat/user123 that clients subscribe to. Events published to a channel are broadcast to all subscribers.
Namespace | Top-level grouping of channels (e.g., chat). Auth rules and handlers are configured per namespace.
Auth Modes | Cognito User Pools, API Key, IAM, Lambda, OIDC. Different modes for connection vs publish vs subscribe.
DIRECT Handler | A Lambda invoked on publish events. Can filter, transform, or drop events before broadcast.
HTTP Endpoint | For publishing events: https://DNS.appsync-api.REGION.amazonaws.com/event
Realtime Endpoint | For WebSocket subscriptions: wss://DNS.appsync-realtime-api.REGION.amazonaws.com/event/realtime
⚠️ Critical: The API ID is NOT the same as the DNS prefix. You must use aws appsync get-api to find the actual DNS endpoints.
# Get your API's actual DNS endpoints
aws appsync get-api --api-id YOUR_API_ID --region YOUR_REGION \
--query "api.dns"
# Returns:
# {
# "HTTP": "abc123xyz.appsync-api.us-east-1.amazonaws.com",
# "REALTIME": "abc123xyz.appsync-realtime-api.us-east-1.amazonaws.com"
# }
#
# The "abc123xyz" prefix is NOT your API ID — it's a separate DNS identifier!
11. Encode auth headers as Base64URL
// For Cognito auth:
const authHeader = {
Authorization: "eyJraWQ..." , // Cognito ID Token (NOT access token)
host: "DNS.appsync-api.REGION.amazonaws.com" // HTTP host, NOT realtime host
};
// Base64URL encode (not standard base64!)
const encoded = btoa(JSON.stringify(authHeader))
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '');
2 2. Open WebSocket with auth as subprotocol
const ws = new WebSocket(
'wss://DNS.appsync-realtime-api.REGION.amazonaws.com/event/realtime',
[
'header-' + encoded, // Auth header as subprotocol
'aws-appsync-event-ws' // Required protocol identifier
]
);
3.Send connection_init after WebSocket opens
ws.addEventListener('open', () => {
ws.send(JSON.stringify({ type: 'connection_init' }));
});
4. Wait for connection_ack
ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
if (data.type === 'connection_ack') {
console.log('Connected! Timeout:', data.connectionTimeoutMs);
// Now you can subscribe to channels
}
});
// After receiving connection_ack:
const subscriptionId = 'unique-id-' + Date.now();
ws.send(JSON.stringify({
type: 'subscribe',
id: subscriptionId,
channel: '/chat/user123',
authorization: {
Authorization: cognitoIdToken,
host: 'DNS.appsync-api.REGION.amazonaws.com'
}
}));
// Listen for subscribe_success:
// { "type": "subscribe_success", "id": "unique-id-..." }
// Listen for incoming events:
// { "type": "data", "id": "unique-id-...", "event": "{ ...JSON string... }" }
// Publishing uses HTTP POST (not WebSocket)
const response = await fetch('https://DNS.appsync-api.REGION.amazonaws.com/event', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': cognitoIdToken // or API key header
},
body: JSON.stringify({
channel: '/chat/user123',
events: [
JSON.stringify({ text: 'Hello!', sender: 'user' }) // Each event is a JSON STRING
]
})
});
💡 Tip: Each item in the events array must be a stringified JSON value, not a raw object.
When a namespace has a DIRECT Lambda handler configured for onPublish, every publish to that namespace invokes the Lambda. The Lambda can filter, transform, or drop events.
{
"identity": {
"claims": { "cognito:username": "user123", ... },
"username": "user123"
},
"info": {
"channel": { "path": "/chat/user123", "segments": ["chat","user123"] },
"channelNamespace": { "name": "chat" },
"operation": "PUBLISH"
},
"events": [
{ "payload": { "text": "Hello!", "sender": "user" }, "id": "event-uuid" }
]
}
// BROADCAST the events to subscribers:
{ "events": [{ "payload": {...}, "id": "..." }] }
// DROP the events (don't broadcast):
{ "events": [] }
// REPORT an error:
{ "error": "Something went wrong" }
// ⚠️ DO NOT return arbitrary dicts like { "statusCode": 200 } — this causes 502!
⚠️ Critical: Returning None, null, or {"statusCode": 200} will result in a 502 Bad Gateway. Always return {"events": [...]} or {"error": "..."}.
# The handler config MUST be set on the channel namespace.
# CDK may not persist it — always verify:
aws appsync get-channel-namespace --api-id API_ID --name NAMESPACE \
--query "channelNamespace.handlerConfigs"
# If null, re-apply via SDK:
import boto3
client = boto3.client('appsync', region_name='REGION')
client.update_channel_namespace(
apiId='API_ID', name='NAMESPACE',
handlerConfigs={
'onPublish': {
'behavior': 'DIRECT',
'integration': {
'dataSourceName': 'YOUR_LAMBDA_DATA_SOURCE',
'lambdaConfig': { 'invokeType': 'REQUEST_RESPONSE' }
}
}
}
)
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
AppSync Events DIRECT onPublish handler.
INPUT FORMAT (what this Lambda receives):
{
"identity": { "claims": {...}, "username": "user1" },
"info": { "channel": {"path":"/chat/user1"}, "operation": "PUBLISH" },
"events": [
{ "payload": { ...published data... }, "id": "event-uuid" }
]
}
OUTPUT FORMAT (what this Lambda MUST return):
{"events": [...]} → broadcast these events to subscribers
{"events": []} → drop (don't broadcast)
{"error": "msg"} → report error (logged, not sent to client)
⚠️ Returning None, null, or {"statusCode":200} causes 502 Bad Gateway!
"""
logger.info("Received: %s", json.dumps(event, default=str))
raw_events = event.get("events", [])
if not raw_events:
return {"events": []}
first_payload = raw_events[0].get("payload", {})
sender_type = first_payload.get("senderType", "user")
# Agent/system messages: pass through for broadcast to WebSocket subscribers
if sender_type in ("agent", "system"):
logger.info("Broadcasting %s message", sender_type)
return {"events": raw_events}
# User messages: trigger your backend, don't broadcast
logger.info("Processing user message: %s", first_payload.get("text", ""))
process_user_message(first_payload, event.get("identity", {}))
return {"events": []}
def process_user_message(payload, identity):
"""Your business logic: start Step Function, call another Lambda, etc."""
pass
"""
Publishes a response to a user's channel using IAM auth (SigV4 signing).
The Lambda's execution role needs: appsync:EventPublish on the API ARN.
"""
import json, os, uuid
from datetime import datetime, timezone
import boto3, requests
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
APPSYNC_HTTP_URL = os.environ["APPSYNC_HTTP_URL"]
# e.g., "https://DNS_PREFIX.appsync-api.REGION.amazonaws.com/event"
session = boto3.Session()
def publish_to_channel(channel: str, message: dict):
"""Publish a message to an AppSync Events channel using IAM SigV4."""
body = json.dumps({
"channel": channel,
"events": [json.dumps(message)] # ⚠️ Must be JSON strings, not objects
})
# Extract region from URL for signing
parts = APPSYNC_HTTP_URL.split(".")
region = parts[parts.index("appsync-api") + 1]
# SigV4 sign the request
aws_request = AWSRequest(method="POST", url=APPSYNC_HTTP_URL, data=body,
headers={"Content-Type": "application/json"})
creds = session.get_credentials().get_frozen_credentials()
SigV4Auth(creds, "appsync", region).add_auth(aws_request)
# Send
resp = requests.post(APPSYNC_HTTP_URL, data=body,
headers=dict(aws_request.headers), timeout=10)
resp.raise_for_status()
return resp.json()
def lambda_handler(event, context):
"""Example: publish an agent response back to the user's channel."""
message = {
"messageId": str(uuid.uuid4()),
"sessionId": event["sessionId"],
"senderType": "agent",
"text": event["text"],
"timestamp": datetime.now(timezone.utc).isoformat(),
"citations": event.get("citations", []),
"error": None
}
publish_to_channel(event["channel"], message)
return {"statusCode": 200, "messageId": message["messageId"]}
{
"Effect": "Allow",
"Action": "appsync:EventPublish",
"Resource": "arn:aws:appsync:REGION:ACCOUNT:apis/API_ID/*"
}
class AppSyncEventsClient {
constructor() {
this._ws = null;
this._connected = false;
this._subscriptions = {};
this._connectResolve = null;
}
/**
* Connect to AppSync Events realtime endpoint.
* @param {string} realtimeUrl - wss://DNS.appsync-realtime-api.REGION.amazonaws.com/event/realtime
* @param {string} token - Cognito ID token
* @param {string} httpHost - DNS.appsync-api.REGION.amazonaws.com
*/
connect(realtimeUrl, token, httpHost) {
return new Promise((resolve, reject) => {
// Base64URL encode the auth header
const authHeader = JSON.stringify({ Authorization: token, host: httpHost });
const encoded = btoa(authHeader).replace(/\+/g,'-').replace(/\//g,'_').replace(/=+$/,'');
// Connect with subprotocol auth
this._ws = new WebSocket(realtimeUrl, ['header-' + encoded, 'aws-appsync-event-ws']);
this._connectResolve = resolve;
this._ws.onopen = () => {
this._ws.send(JSON.stringify({ type: 'connection_init' }));
};
this._ws.onmessage = (event) => this._handleMessage(JSON.parse(event.data));
this._ws.onerror = () => reject(new Error('WebSocket error'));
this._ws.onclose = (e) => {
this._connected = false;
// Auto re-auth on token expiry
if (e.code === 1000) {
const lastError = this._lastError;
if (lastError && lastError.errorCode === 401) {
this.onTokenExpired && this.onTokenExpired();
}
}
};
});
}
/**
* Subscribe to a channel.
* @param {string} channel - e.g., "/chat/user123"
* @param {string} token - Cognito ID token for subscribe auth
* @param {string} httpHost - HTTP API host
* @param {function} callback - called with event payload
*/
subscribe(channel, token, httpHost, callback) {
const id = 'sub_' + Date.now() + '_' + Math.random().toString(36).slice(2);
this._subscriptions[id] = { channel, callback };
this._ws.send(JSON.stringify({
type: 'subscribe',
id: id,
channel: channel,
authorization: { Authorization: token, host: httpHost }
}));
return id;
}
/**
* Publish a message via HTTP POST.
* @param {string} httpUrl - https://DNS.appsync-api.REGION.amazonaws.com/event
* @param {string} token - Cognito ID token
* @param {string} channel - target channel
* @param {object} payload - message object to publish
*/
async publish(httpUrl, token, channel, payload) {
const response = await fetch(httpUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json', Authorization: token },
body: JSON.stringify({ channel, events: [JSON.stringify(payload)] })
});
if (!response.ok) throw new Error(`Publish failed: ${response.status}`);
return response.json();
}
/** Disconnect cleanly */
disconnect() {
if (this._ws) this._ws.close(1000);
this._ws = null;
this._connected = false;
}
// --- Private ---
_handleMessage(data) {
switch (data.type) {
case 'connection_ack':
this._connected = true;
this._connectResolve && this._connectResolve();
break;
case 'subscribe_success':
console.log('Subscribed:', data.id);
break;
case 'data':
// Deliver event to subscriber callback
const sub = this._subscriptions[data.id];
if (sub) {
const event = typeof data.event === 'string' ? JSON.parse(data.event) : data.event;
sub.callback(event);
}
break;
case 'connection_error':
this._lastError = (data.errors || [])[0];
console.error('Connection error:', data.errors);
break;
case 'ka':
break; // keep-alive, ignore
}
}
}
// ===== USAGE EXAMPLE =====
const client = new AppSyncEventsClient();
// Your endpoints (from `aws appsync get-api`)
const REALTIME_URL = 'wss://abc123.appsync-realtime-api.us-east-1.amazonaws.com/event/realtime';
const HTTP_URL = 'https://abc123.appsync-api.us-east-1.amazonaws.com/event';
const HTTP_HOST = 'abc123.appsync-api.us-east-1.amazonaws.com';
const TOKEN = localStorage.getItem('cognito.idToken');
// Handle token expiry — redirect to Cognito for silent re-auth
client.onTokenExpired = () => {
window.location.href = 'https://YOUR_COGNITO_DOMAIN/login?...&redirect_uri=...';
};
// Connect
await client.connect(REALTIME_URL, TOKEN, HTTP_HOST);
// Subscribe
client.subscribe('/chat/myuser', TOKEN, HTTP_HOST, (event) => {
console.log('Received:', event);
// event is the parsed JSON payload from the publisher
});
// Publish
await client.publish(HTTP_URL, TOKEN, '/chat/myuser', {
text: 'Hello world!',
sender: 'user',
timestamp: new Date().toISOString()
});
⚠️ API ID ≠ DNS prefix
The AppSync API ID (e.g., 4nsugs3fdjbirfhonlsi3nnd24) is NOT the DNS prefix for the WebSocket/HTTP endpoints. Use aws appsync get-api --query "api.dns" to get the actual DNS hostnames.
⚠️ Host header must be HTTP host
The host field in auth headers must be DNS.appsync-api.REGION.amazonaws.com (HTTP), NOT DNS.appsync-realtime-api.REGION.amazonaws.com (WebSocket).
⚠️ Cognito: ID token, not access token
AppSync Events with Cognito auth requires the ID token (token_use: "id"). The access token will be rejected.
⚠️ connection_init is required
After the WebSocket opens, you must send {"type":"connection_init"}. Without it, the server never sends connection_ack and the connection times out.
⚠️ DIRECT handler must return {events: [...]}
Returning None, null, or {"statusCode":200} causes a 502 Bad Gateway. Always return {"events":[]} to drop events, or {"events":[...]} to broadcast.
⚠️ Handler config may disappear
CDK deployments can reset the handlerConfigs on your channel namespace. Always verify with get-channel-namespace after deploying. Re-apply via the SDK if null.
⚠️ Events in publish body must be stringified
The events array in the HTTP POST body must contain JSON strings, not raw objects: events: [JSON.stringify(payload)]
⚠️ Feedback loops with DIRECT handlers
If your handler publishes back to the same channel (e.g., error messages), it triggers itself. Filter by senderType or identity to break the loop.