Skip to main content

Webhooks Guide

Set up webhook endpoints to receive immediate notifications when Meter detects content changes.

Overview

Webhooks allow Meter to push change notifications to your application in real-time, eliminating the need for polling. Use webhooks when:
  • Changes need immediate action
  • Building event-driven systems
  • Triggering downstream workflows
Use pull-based instead when:
  • Batch processing changes
  • Webhooks aren’t feasible (firewall, no public endpoint)
  • Prefer manual control over timing

How it works

Webhook payload

Meter sends a POST request with this payload:
{
  "job_id": "660e8400-e29b-41d4-a716-446655440000",
  "schedule_id": "880e8400-e29b-41d4-a716-446655440000",
  "status": "completed",
  "results": [
    {"title": "Product A", "price": "$19.99"},
    {"title": "Product B", "price": "$29.99"}
  ],
  "item_count": 2,
  "has_changes": true,
  "content_hash": "7f3d9a2b4c1e...",
  "completed_at": "2025-01-15T10:30:12Z"
}

Implementation

Step 1: Create a webhook endpoint

TODO: Implement for your framework
from fastapi import FastAPI, Request

app = FastAPI()

@app.post("/webhooks/meter")
async def handle_meter_webhook(request: Request):
    payload = await request.json()

    # TODO: Add your processing logic
    if payload['has_changes']:
        results = payload['results']
        # Process results...
        print(f"Received {len(results)} changed items")

    return {"status": "ok"}

Step 2: Make endpoint publicly accessible

TODO: Deploy your webhook endpoint Options:
  • Deploy to cloud (AWS Lambda, Google Cloud Functions, etc.)
  • Use ngrok for local development: ngrok http 3000
  • Use a VPS with public IP

Step 3: Create schedule with webhook URL

from meter_sdk import MeterClient

client = MeterClient(api_key="sk_live_...")

schedule = client.create_schedule(
    strategy_id="your-strategy-id",
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter"  # TODO: Your URL
)

Best practices

1. Respond quickly

Respond with 200 OK within 30 seconds:
@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    payload = await request.json()

    # Queue for background processing
    await queue.enqueue(process_changes, payload)

    # Respond immediately
    return {"status": "ok"}

async def process_changes(payload):
    """Process in background"""
    # Heavy processing here...
    pass

2. Validate requests (optional security)

TODO: Implement signature verification
import hmac
import hashlib

def verify_signature(payload, signature, secret):
    """Verify webhook signature"""
    expected = hmac.new(
        secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)

@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    payload = await request.body()
    signature = request.headers.get('X-Meter-Signature')

    # TODO: Add signature verification when available
    # if not verify_signature(payload, signature, webhook_secret):
    #     raise HTTPException(401, "Invalid signature")

    data = await request.json()
    # Process...

3. Handle failures gracefully

TODO: Add retry logic
@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    try:
        payload = await request.json()
        await process_changes(payload)
        return {"status": "ok"}
    except Exception as e:
        logger.error(f"Webhook processing failed: {e}")
        # Return 200 to prevent Meter from retrying
        # Store failed payload for manual review
        await save_failed_webhook(payload, str(e))
        return {"status": "error", "message": str(e)}

4. Monitor webhook health

TODO: Add monitoring
import time

webhook_metrics = {
    "received": 0,
    "processed": 0,
    "failed": 0,
    "last_received": None
}

@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    webhook_metrics["received"] += 1
    webhook_metrics["last_received"] = time.time()

    try:
        payload = await request.json()
        await process_changes(payload)
        webhook_metrics["processed"] += 1
    except Exception as e:
        webhook_metrics["failed"] += 1
        raise

    return {"status": "ok"}

@app.get("/metrics")
async def get_metrics():
    return webhook_metrics

Testing webhooks

Local testing with ngrok

# Start your webhook server
python app.py

# In another terminal, expose with ngrok
ngrok http 3000

# Use the ngrok URL in Meter
# https://abc123.ngrok.io/webhooks/meter

Testing with webhook.site

  1. Go to https://webhook.site
  2. Copy the unique URL
  3. Use it in your Meter schedule
  4. Trigger a job and view the payload on webhook.site

Manual testing

# Simulate a webhook call
curl -X POST http://localhost:3000/webhooks/meter \
  -H "Content-Type: application/json" \
  -d '{
    "job_id": "test",
    "has_changes": true,
    "results": [{"test": "data"}]
  }'

Troubleshooting

Solutions:
  • Verify URL is publicly accessible
  • Check endpoint returns 200 OK
  • Test with webhook.site
  • Check server logs for errors
Cause: Endpoint takes >30 seconds to respondSolution: Return 200 OK immediately, process asynchronously:
@app.post("/webhooks/meter")
async def handle_webhook(request: Request):
    payload = await request.json()
    background_tasks.add_task(process_changes, payload)
    return {"status": "ok"}  # Immediate response
Cause: Network issues may cause retriesSolution: Make processing idempotent:
processed_jobs = set()

async def process_changes(payload):
    job_id = payload['job_id']

    if job_id in processed_jobs:
        print(f"Skipping duplicate: {job_id}")
        return

    # Process...
    processed_jobs.add(job_id)

Complete example

TODO: Add complete working example See Example: E-commerce Monitoring for a full implementation with webhooks.

Next steps

Need help?

Email me at [email protected]