09bdb8aaad
* feat(python-sdk): add webhook verification and event handling Add webhook support to the Python SDK matching the JS SDK implementation: - Add Webhooks class with verify() and construct_event() methods - Implement HMAC-SHA256 signature verification with timing-safe comparison - Add timestamp validation with configurable tolerance (default 5 minutes) - Add comprehensive webhook event types (18 events: email, contact, domain, test) - Add WebhookVerificationError with typed error codes - Export webhook constants (headers) and types * fix(python-sdk): harden webhook parsing and typing Normalize invalid UTF-8 webhook payloads to INVALID_BODY errors so verify() safely returns false, and narrow base email webhook event types to avoid discriminated-union overlap. Add regression tests for both paths. * chore(python-sdk): bump package version to 0.2.9 * feat(python-sdk): add local webhook test example project Add a runnable Flask receiver and signed webhook sender under packages/python-sdk/example, and link it from the Python SDK README for local verification. --------- Co-authored-by: Claude <noreply@anthropic.com>
327 lines
9.3 KiB
Python
327 lines
9.3 KiB
Python
"""Webhook verification and event construction for UseSend webhooks.
|
|
|
|
This module provides secure webhook signature verification using HMAC-SHA256,
|
|
timestamp validation, and type-safe event parsing.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from typing import Any, Dict, Literal, Mapping, Optional, Union
|
|
|
|
from .types import WebhookEventData
|
|
|
|
|
|
# Webhook header names
|
|
WEBHOOK_SIGNATURE_HEADER = "X-UseSend-Signature"
|
|
WEBHOOK_TIMESTAMP_HEADER = "X-UseSend-Timestamp"
|
|
WEBHOOK_EVENT_HEADER = "X-UseSend-Event"
|
|
WEBHOOK_CALL_HEADER = "X-UseSend-Call"
|
|
|
|
# Signature format
|
|
SIGNATURE_PREFIX = "v1="
|
|
|
|
# Default tolerance: 5 minutes in milliseconds
|
|
DEFAULT_TOLERANCE_MS = 5 * 60 * 1000
|
|
|
|
|
|
WebhookVerificationErrorCode = Literal[
|
|
"MISSING_SIGNATURE",
|
|
"MISSING_TIMESTAMP",
|
|
"INVALID_SIGNATURE_FORMAT",
|
|
"INVALID_TIMESTAMP",
|
|
"TIMESTAMP_OUT_OF_RANGE",
|
|
"SIGNATURE_MISMATCH",
|
|
"INVALID_BODY",
|
|
"INVALID_JSON",
|
|
]
|
|
|
|
|
|
class WebhookVerificationError(Exception):
|
|
"""Error raised when webhook verification fails.
|
|
|
|
Attributes:
|
|
code: The error code indicating the type of verification failure.
|
|
message: A human-readable description of the error.
|
|
"""
|
|
|
|
def __init__(self, code: WebhookVerificationErrorCode, message: str) -> None:
|
|
self.code = code
|
|
self.message = message
|
|
super().__init__(message)
|
|
|
|
def __str__(self) -> str:
|
|
return f"[{self.code}] {self.message}"
|
|
|
|
|
|
class Webhooks:
|
|
"""Webhook verification and event construction.
|
|
|
|
This class provides methods to verify webhook signatures and parse
|
|
webhook events with type safety.
|
|
|
|
Parameters
|
|
----------
|
|
secret:
|
|
The webhook signing secret (starts with 'whsec_').
|
|
|
|
Example
|
|
-------
|
|
```python
|
|
from usesend import UseSend
|
|
|
|
usesend = UseSend("us_12345")
|
|
webhooks = usesend.webhooks("whsec_xxx")
|
|
|
|
# Flask example
|
|
@app.route("/webhook", methods=["POST"])
|
|
def handle_webhook():
|
|
try:
|
|
event = webhooks.construct_event(
|
|
request.data,
|
|
headers=request.headers
|
|
)
|
|
|
|
if event["type"] == "email.delivered":
|
|
print(f"Email delivered to {event['data']['to']}")
|
|
|
|
return "OK", 200
|
|
except WebhookVerificationError as e:
|
|
return str(e), 400
|
|
```
|
|
"""
|
|
|
|
def __init__(self, secret: str) -> None:
|
|
self._secret = secret
|
|
|
|
def verify(
|
|
self,
|
|
body: Union[str, bytes],
|
|
*,
|
|
headers: Mapping[str, Any],
|
|
secret: Optional[str] = None,
|
|
tolerance: Optional[int] = None,
|
|
) -> bool:
|
|
"""Verify webhook signature without parsing the event.
|
|
|
|
Parameters
|
|
----------
|
|
body:
|
|
Raw webhook body (string or bytes).
|
|
headers:
|
|
Request headers (dict-like object).
|
|
secret:
|
|
Optional override for the webhook secret.
|
|
tolerance:
|
|
Optional tolerance in milliseconds for timestamp validation.
|
|
Defaults to 5 minutes. Set to -1 to disable timestamp validation.
|
|
|
|
Returns
|
|
-------
|
|
bool
|
|
True if signature is valid, False otherwise.
|
|
|
|
Example
|
|
-------
|
|
```python
|
|
is_valid = webhooks.verify(body, headers=request.headers)
|
|
|
|
if not is_valid:
|
|
return "Invalid signature", 401
|
|
```
|
|
"""
|
|
try:
|
|
self._verify_internal(body, headers=headers, secret=secret, tolerance=tolerance)
|
|
return True
|
|
except WebhookVerificationError:
|
|
return False
|
|
|
|
def construct_event(
|
|
self,
|
|
body: Union[str, bytes],
|
|
*,
|
|
headers: Mapping[str, Any],
|
|
secret: Optional[str] = None,
|
|
tolerance: Optional[int] = None,
|
|
) -> WebhookEventData:
|
|
"""Verify and parse a webhook event.
|
|
|
|
Parameters
|
|
----------
|
|
body:
|
|
Raw webhook body (string or bytes).
|
|
headers:
|
|
Request headers (dict-like object).
|
|
secret:
|
|
Optional override for the webhook secret.
|
|
tolerance:
|
|
Optional tolerance in milliseconds for timestamp validation.
|
|
Defaults to 5 minutes. Set to -1 to disable timestamp validation.
|
|
|
|
Returns
|
|
-------
|
|
WebhookEventData
|
|
Verified and typed webhook event.
|
|
|
|
Raises
|
|
------
|
|
WebhookVerificationError
|
|
If the webhook signature is invalid or the payload cannot be parsed.
|
|
|
|
Example
|
|
-------
|
|
```python
|
|
# Flask
|
|
event = webhooks.construct_event(
|
|
request.data,
|
|
headers=request.headers
|
|
)
|
|
|
|
# Django
|
|
event = webhooks.construct_event(
|
|
request.body,
|
|
headers=request.headers
|
|
)
|
|
|
|
# FastAPI
|
|
event = webhooks.construct_event(
|
|
await request.body(),
|
|
headers=dict(request.headers)
|
|
)
|
|
|
|
# Type-safe event handling
|
|
if event["type"] == "email.delivered":
|
|
print(event["data"]["to"])
|
|
elif event["type"] == "email.bounced":
|
|
print(event["data"]["bounce"]["type"])
|
|
```
|
|
"""
|
|
self._verify_internal(body, headers=headers, secret=secret, tolerance=tolerance)
|
|
|
|
body_string = _to_string(body)
|
|
try:
|
|
return json.loads(body_string)
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise WebhookVerificationError(
|
|
"INVALID_JSON",
|
|
f"Webhook payload is not valid JSON: {e}",
|
|
) from e
|
|
|
|
def _verify_internal(
|
|
self,
|
|
body: Union[str, bytes],
|
|
*,
|
|
headers: Mapping[str, Any],
|
|
secret: Optional[str] = None,
|
|
tolerance: Optional[int] = None,
|
|
) -> None:
|
|
"""Internal verification logic."""
|
|
webhook_secret = secret if secret is not None else self._secret
|
|
signature = _get_header(headers, WEBHOOK_SIGNATURE_HEADER)
|
|
timestamp = _get_header(headers, WEBHOOK_TIMESTAMP_HEADER)
|
|
|
|
if not signature:
|
|
raise WebhookVerificationError(
|
|
"MISSING_SIGNATURE",
|
|
f"Missing {WEBHOOK_SIGNATURE_HEADER} header",
|
|
)
|
|
|
|
if not timestamp:
|
|
raise WebhookVerificationError(
|
|
"MISSING_TIMESTAMP",
|
|
f"Missing {WEBHOOK_TIMESTAMP_HEADER} header",
|
|
)
|
|
|
|
if not signature.startswith(SIGNATURE_PREFIX):
|
|
raise WebhookVerificationError(
|
|
"INVALID_SIGNATURE_FORMAT",
|
|
"Signature header must start with v1=",
|
|
)
|
|
|
|
try:
|
|
timestamp_num = int(timestamp)
|
|
except ValueError as e:
|
|
raise WebhookVerificationError(
|
|
"INVALID_TIMESTAMP",
|
|
"Timestamp header must be a number (milliseconds since epoch)",
|
|
) from e
|
|
|
|
tolerance_ms = tolerance if tolerance is not None else DEFAULT_TOLERANCE_MS
|
|
now = int(time.time() * 1000)
|
|
|
|
if tolerance_ms >= 0 and abs(now - timestamp_num) > tolerance_ms:
|
|
raise WebhookVerificationError(
|
|
"TIMESTAMP_OUT_OF_RANGE",
|
|
"Webhook timestamp is outside the allowed tolerance",
|
|
)
|
|
|
|
body_string = _to_string(body)
|
|
expected = _compute_signature(webhook_secret, timestamp, body_string)
|
|
|
|
if not _safe_compare(expected, signature):
|
|
raise WebhookVerificationError(
|
|
"SIGNATURE_MISMATCH",
|
|
"Webhook signature does not match",
|
|
)
|
|
|
|
|
|
def _compute_signature(secret: str, timestamp: str, body: str) -> str:
|
|
"""Compute the HMAC-SHA256 signature for webhook verification."""
|
|
message = f"{timestamp}.{body}"
|
|
signature = hmac.new(
|
|
secret.encode("utf-8"),
|
|
message.encode("utf-8"),
|
|
hashlib.sha256,
|
|
).hexdigest()
|
|
return f"{SIGNATURE_PREFIX}{signature}"
|
|
|
|
|
|
def _to_string(body: Union[str, bytes]) -> str:
|
|
"""Convert body to UTF-8 string."""
|
|
if isinstance(body, str):
|
|
return body
|
|
if isinstance(body, bytes):
|
|
try:
|
|
return body.decode("utf-8")
|
|
except UnicodeDecodeError as e:
|
|
raise WebhookVerificationError(
|
|
"INVALID_BODY",
|
|
"Webhook body must be valid UTF-8.",
|
|
) from e
|
|
raise WebhookVerificationError(
|
|
"INVALID_BODY",
|
|
f"Unsupported body type: {type(body).__name__}. Expected str or bytes.",
|
|
)
|
|
|
|
|
|
def _get_header(headers: Mapping[str, Any], name: str) -> Optional[str]:
|
|
"""Get header value in a case-insensitive manner."""
|
|
if headers is None:
|
|
return None
|
|
|
|
# Try direct access first
|
|
if name in headers:
|
|
value = headers[name]
|
|
if isinstance(value, list):
|
|
return value[0] if value else None
|
|
return str(value) if value is not None else None
|
|
|
|
# Case-insensitive lookup
|
|
lower_name = name.lower()
|
|
for key in headers:
|
|
if key.lower() == lower_name:
|
|
value = headers[key]
|
|
if isinstance(value, list):
|
|
return value[0] if value else None
|
|
return str(value) if value is not None else None
|
|
|
|
return None
|
|
|
|
|
|
def _safe_compare(a: str, b: str) -> bool:
|
|
"""Timing-safe string comparison to prevent timing attacks."""
|
|
return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
|