feat(python-sdk): add webhook verification and event handling (#344)

* feat(python-sdk): add webhook verification and event handling

Add webhook support to the Python SDK matching the JS SDK implementation:
- Add Webhooks class with verify() and construct_event() methods
- Implement HMAC-SHA256 signature verification with timing-safe comparison
- Add timestamp validation with configurable tolerance (default 5 minutes)
- Add comprehensive webhook event types (18 events: email, contact, domain, test)
- Add WebhookVerificationError with typed error codes
- Export webhook constants (headers) and types

* fix(python-sdk): harden webhook parsing and typing

Normalize invalid UTF-8 webhook payloads to INVALID_BODY errors so verify() safely returns false, and narrow base email webhook event types to avoid discriminated-union overlap. Add regression tests for both paths.

* chore(python-sdk): bump package version to 0.2.9

* feat(python-sdk): add local webhook test example project

Add a runnable Flask receiver and signed webhook sender under packages/python-sdk/example, and link it from the Python SDK README for local verification.

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
KM Koushik
2026-02-08 08:18:14 +11:00
committed by GitHub
parent e246d32ef9
commit 09bdb8aaad
11 changed files with 1046 additions and 2 deletions
+103
View File
@@ -0,0 +1,103 @@
import hashlib
import hmac
import json
import time
from typing import get_args, get_type_hints
import pytest
from usesend import types
from usesend.webhooks import (
WEBHOOK_SIGNATURE_HEADER,
WEBHOOK_TIMESTAMP_HEADER,
WebhookVerificationError,
Webhooks,
)
def _sign(secret: str, timestamp: str, body: str) -> str:
digest = hmac.new(
secret.encode("utf-8"),
f"{timestamp}.{body}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
return f"v1={digest}"
def test_verify_returns_false_for_non_utf8_bytes_body() -> None:
webhooks = Webhooks("whsec_test")
timestamp = str(int(time.time() * 1000))
is_valid = webhooks.verify(
b"\xff",
headers={
WEBHOOK_SIGNATURE_HEADER: "v1=deadbeef",
WEBHOOK_TIMESTAMP_HEADER: timestamp,
},
)
assert is_valid is False
def test_construct_event_raises_invalid_body_for_non_utf8_bytes() -> None:
webhooks = Webhooks("whsec_test")
timestamp = str(int(time.time() * 1000))
with pytest.raises(WebhookVerificationError) as exc:
webhooks.construct_event(
b"\xff",
headers={
WEBHOOK_SIGNATURE_HEADER: "v1=deadbeef",
WEBHOOK_TIMESTAMP_HEADER: timestamp,
},
)
assert exc.value.code == "INVALID_BODY"
def test_email_webhook_event_type_excludes_specialized_events() -> None:
email_event_type = get_type_hints(types.EmailWebhookEvent)["type"]
supported = set(get_args(email_event_type))
assert "email.delivered" in supported
assert "email.bounced" not in supported
assert "email.failed" not in supported
assert "email.suppressed" not in supported
assert "email.opened" not in supported
assert "email.clicked" not in supported
def test_construct_event_parses_bounced_event_with_valid_signature() -> None:
secret = "whsec_test"
webhooks = Webhooks(secret)
timestamp = str(int(time.time() * 1000))
payload = {
"id": "evt_123",
"type": "email.bounced",
"createdAt": "2026-02-08T10:00:00.000Z",
"data": {
"id": "email_123",
"status": "BOUNCED",
"from": "from@example.com",
"to": ["to@example.com"],
"occurredAt": "2026-02-08T10:00:00.000Z",
"bounce": {
"type": "Permanent",
"subType": "General",
},
},
}
body = json.dumps(payload)
signature = _sign(secret, timestamp, body)
event = webhooks.construct_event(
body,
headers={
WEBHOOK_SIGNATURE_HEADER: signature,
WEBHOOK_TIMESTAMP_HEADER: timestamp,
},
)
assert event["type"] == "email.bounced"
assert event["data"]["bounce"]["type"] == "Permanent"