Skip to main content

Webhooks Guide

Set up webhook endpoints to receive immediate notifications when Meter detects content changes.

Overview

Webhooks allow Meter to push change notifications to your application in real-time, eliminating the need for polling. Use webhooks when:
  • Changes need immediate action
  • Building event-driven systems
  • Triggering downstream workflows
Use pull-based instead when:
  • Batch processing changes
  • Webhooks aren’t feasible (firewall, no public endpoint)
  • Prefer manual control over timing

How it works

Webhook types

Meter supports two webhook formats:
TypeDescriptionWhen to use
standardFull JSON payload with all resultsMost integrations
slackFormatted Slack messageSlack incoming webhooks
The webhook type is auto-detected from the URL. If your webhook URL contains hooks.slack.com, Meter automatically uses the Slack format. You can also set it explicitly:
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://hooks.slack.com/services/T.../B.../xxx",
    webhook_type="slack"  # auto-detected, but can be explicit
)

Webhook payload

Meter sends a POST request for both successful and failed jobs. Success payload:
{
  "job_id": "660e8400-e29b-41d4-a716-446655440000",
  "schedule_id": "880e8400-e29b-41d4-a716-446655440000",
  "status": "completed",
  "url": "https://example.com/products",
  "results": [
    {"title": "Product A", "price": "$19.99"},
    {"title": "Product B", "price": "$29.99"}
  ],
  "item_count": 2,
  "has_changes": true,
  "content_hash": "7f3d9a2b4c1e...",
  "completed_at": "2025-01-15T10:30:12Z",
  "metadata": {"project": "my-project", "env": "prod"}
}
Failure payload:
{
  "job_id": "660e8400-e29b-41d4-a716-446655440000",
  "schedule_id": "880e8400-e29b-41d4-a716-446655440000",
  "status": "failed",
  "url": "https://example.com/products",
  "error": "Page not accessible: 404 Not Found",
  "completed_at": "2025-01-15T10:30:12Z",
  "metadata": {"project": "my-project", "env": "prod"}
}
The metadata field contains your custom JSON from webhook_metadata — it’s included in every payload so you can identify which project or environment the webhook belongs to. See Webhook payload formats for full field documentation.

Webhook metadata

Attach custom JSON data to every webhook payload. This is useful for routing, tagging, or identifying which schedule triggered the webhook:
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter",
    webhook_metadata={
        "project": "price-monitor",
        "env": "production",
        "team": "data-eng"
    }
)
Your webhook handler can then use this metadata for routing:
@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    payload = await request.json()
    metadata = payload.get("metadata", {})

    if metadata.get("project") == "price-monitor":
        await update_price_database(payload)
    elif metadata.get("project") == "news-tracker":
        await update_news_feed(payload)

Webhook secrets

Webhook secrets let you verify that incoming requests are from Meter, not a third party.

How it works

  1. When you create a schedule with a webhook_url, Meter auto-generates a secret with a whsec_ prefix
  2. Every webhook request includes the secret in the X-Webhook-Secret header
  3. Your endpoint verifies the header matches your stored secret

Storing the secret

The secret is returned once when the schedule is created. Store it securely:
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter"
)

# Save this — it won't be shown again in other API responses
webhook_secret = schedule.get("webhook_secret")
print(f"Store this secret: {webhook_secret}")
You can also provide your own secret:
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter",
    webhook_secret="whsec_my_custom_secret_here"
)

Verifying requests

Check the X-Webhook-Secret header in your webhook handler:
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()
WEBHOOK_SECRET = "whsec_..."  # From schedule creation

@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    # Verify the secret
    secret = request.headers.get("X-Webhook-Secret")
    if secret != WEBHOOK_SECRET:
        raise HTTPException(status_code=401, detail="Invalid webhook secret")

    payload = await request.json()
    # Process payload...
    return {"status": "ok"}

Regenerating secrets

If a secret is compromised, regenerate it:
result = client.regenerate_webhook_secret(schedule_id)
new_secret = result["webhook_secret"]
# Update your webhook handler with the new secret
The old secret is immediately invalidated. Update your webhook handler before the next delivery.

Retry behavior

Meter automatically retries failed webhook deliveries with exponential backoff:
RetryDelayTotal elapsed
1st15 minutes15 minutes
2nd30 minutes45 minutes
3rd1 hour1 hour 45 minutes
4th2 hours3 hours 45 minutes
5th (final)4 hours7 hours 45 minutes
Retry rules:
  • 2xx response: Delivery successful, no retry
  • 4xx response (client error): Delivery stops immediately — no retries. Fix your endpoint and the next scheduled job will deliver normally
  • 5xx response (server error): Retries with backoff
  • Timeout (>30 seconds): Retries with backoff
  • Connection failure: Retries with backoff
Return 200 OK as quickly as possible. Process the payload asynchronously in a background task to avoid timeouts.

Implementation

Step 1: Create a webhook endpoint

from fastapi import FastAPI, Request, HTTPException, BackgroundTasks

app = FastAPI()
WEBHOOK_SECRET = "whsec_..."

@app.post("/webhooks/meter")
async def handle_meter_webhook(
    request: Request,
    background_tasks: BackgroundTasks
):
    # Verify secret
    secret = request.headers.get("X-Webhook-Secret")
    if secret != WEBHOOK_SECRET:
        raise HTTPException(status_code=401, detail="Invalid secret")

    payload = await request.json()

    if payload["status"] == "failed":
        background_tasks.add_task(handle_failure, payload)
        return {"status": "ok"}

    if payload.get("has_changes"):
        background_tasks.add_task(process_changes, payload)

    return {"status": "ok"}

async def process_changes(payload):
    results = payload["results"]
    metadata = payload.get("metadata", {})
    # Update your database, trigger pipelines, etc.

async def handle_failure(payload):
    # Log, alert, or retry logic
    print(f"Job failed: {payload['error']}")

Step 2: Make endpoint publicly accessible

Options:
  • Deploy to cloud (AWS Lambda, Google Cloud Functions, etc.)
  • Use ngrok for local development: ngrok http 3000
  • Use a VPS with public IP

Step 3: Create schedule with webhook URL

from meter_sdk import MeterClient

client = MeterClient(api_key="sk_live_...")

schedule = client.create_schedule(
    strategy_id="your-strategy-id",
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter",
    webhook_metadata={"project": "price-monitor"}
)

# Store the auto-generated webhook secret
print(f"Webhook secret: {schedule.get('webhook_secret')}")

Slack webhooks

Send notifications directly to Slack channels:
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://hooks.slack.com/services/T.../B.../xxx"
    # webhook_type auto-detected as "slack"
)
Slack payloads are automatically formatted as readable messages with item counts and a preview of results.

Best practices

1. Respond quickly

Return 200 OK within 30 seconds. Process payloads asynchronously:
@app.post("/webhooks/meter")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.json()
    background_tasks.add_task(process_changes, payload)
    return {"status": "ok"}  # Respond immediately

2. Handle duplicates

Make processing idempotent using the job_id:
processed_jobs = set()

async def process_changes(payload):
    job_id = payload['job_id']
    if job_id in processed_jobs:
        return  # Skip duplicate
    processed_jobs.add(job_id)
    # Process...

3. Handle failures gracefully

@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    try:
        payload = await request.json()
        await process_changes(payload)
        return {"status": "ok"}
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}")
        # Return 200 to prevent retries (payload is stored for manual review)
        await save_failed_webhook(payload, str(e))
        return {"status": "error", "message": str(e)}

Testing webhooks

Local testing with ngrok

# Start your webhook server
python app.py

# In another terminal, expose with ngrok
ngrok http 3000

# Use the ngrok URL in Meter
# https://abc123.ngrok.io/webhooks/meter

Testing with webhook.site

  1. Go to https://webhook.site
  2. Copy the unique URL
  3. Use it in your Meter schedule
  4. Trigger a job and view the payload on webhook.site

Test endpoint

Use the Meter test endpoint to verify delivery:
curl -X POST https://api.meter.sh/api/webhooks/test \
  -H "Authorization: Bearer sk_live_..." \
  -H "Content-Type: application/json" \
  -d '{"webhook_url": "https://your-app.com/webhooks/meter"}'

Manual testing

curl -X POST http://localhost:3000/webhooks/meter \
  -H "Content-Type: application/json" \
  -H "X-Webhook-Secret: whsec_test" \
  -d '{
    "job_id": "test-123",
    "status": "completed",
    "url": "https://example.com",
    "has_changes": true,
    "results": [{"test": "data"}],
    "item_count": 1,
    "metadata": {"project": "test"}
  }'

Troubleshooting

Solutions:
  • Verify URL is publicly accessible
  • Check endpoint returns 200 OK
  • Test with webhook.site
  • Check server logs for errors
  • Verify the schedule has a webhook_url set
Cause: Endpoint takes >30 seconds to respondSolution: Return 200 OK immediately, process asynchronously:
@app.post("/webhooks/meter")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
    payload = await request.json()
    background_tasks.add_task(process_changes, payload)
    return {"status": "ok"}  # Immediate response
Cause: Retries after timeout or connection issuesSolution: Make processing idempotent using job_id:
processed_jobs = set()

async def process_changes(payload):
    job_id = payload['job_id']
    if job_id in processed_jobs:
        return
    processed_jobs.add(job_id)
    # Process...
Cause: Your endpoint returns a 4xx status codeSolution: Meter treats 4xx as a permanent failure and does not retry. Check your endpoint for:
  • Invalid webhook secret (401)
  • Incorrect URL path (404)
  • Request validation errors (422)
Fix the issue and delivery will resume on the next scheduled job.

Next steps

Need help?

Email me at mckinnon@meter.sh