Webhooks Guide
Set up webhook endpoints to receive immediate notifications when Meter detects content changes.
Overview
Webhooks allow Meter to push change notifications to your application in real-time, eliminating the need for polling.
Use webhooks when:
Changes need immediate action
Building event-driven systems
Triggering downstream workflows
Use pull-based instead when:
Batch processing changes
Webhooks aren’t feasible (firewall, no public endpoint)
Prefer manual control over timing
How it works
Webhook types
Meter supports two webhook formats:
Type Description When to use standardFull JSON payload with all results Most integrations slackFormatted Slack message Slack incoming webhooks
The webhook type is auto-detected from the URL . If your webhook URL contains hooks.slack.com, Meter automatically uses the Slack format. You can also set it explicitly:
schedule = client.create_schedule(
strategy_id = strategy_id,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://hooks.slack.com/services/T.../B.../xxx" ,
webhook_type = "slack" # auto-detected, but can be explicit
)
Webhook payload
Meter sends a POST request for both successful and failed jobs.
Success payload:
{
"job_id" : "660e8400-e29b-41d4-a716-446655440000" ,
"schedule_id" : "880e8400-e29b-41d4-a716-446655440000" ,
"status" : "completed" ,
"url" : "https://example.com/products" ,
"results" : [
{ "title" : "Product A" , "price" : "$19.99" },
{ "title" : "Product B" , "price" : "$29.99" }
],
"item_count" : 2 ,
"has_changes" : true ,
"content_hash" : "7f3d9a2b4c1e..." ,
"completed_at" : "2025-01-15T10:30:12Z" ,
"metadata" : { "project" : "my-project" , "env" : "prod" }
}
Failure payload:
{
"job_id" : "660e8400-e29b-41d4-a716-446655440000" ,
"schedule_id" : "880e8400-e29b-41d4-a716-446655440000" ,
"status" : "failed" ,
"url" : "https://example.com/products" ,
"error" : "Page not accessible: 404 Not Found" ,
"completed_at" : "2025-01-15T10:30:12Z" ,
"metadata" : { "project" : "my-project" , "env" : "prod" }
}
The metadata field contains your custom JSON from webhook_metadata — it’s included in every payload so you can identify which project or environment the webhook belongs to.
See Webhook payload formats for full field documentation.
Attach custom JSON data to every webhook payload. This is useful for routing, tagging, or identifying which schedule triggered the webhook:
schedule = client.create_schedule(
strategy_id = strategy_id,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://your-app.com/webhooks/meter" ,
webhook_metadata = {
"project" : "price-monitor" ,
"env" : "production" ,
"team" : "data-eng"
}
)
Your webhook handler can then use this metadata for routing:
@app.post ( "/webhooks/meter" )
async def handle_webhook ( request : Request):
payload = await request.json()
metadata = payload.get( "metadata" , {})
if metadata.get( "project" ) == "price-monitor" :
await update_price_database(payload)
elif metadata.get( "project" ) == "news-tracker" :
await update_news_feed(payload)
Webhook secrets
Webhook secrets let you verify that incoming requests are from Meter, not a third party.
How it works
When you create a schedule with a webhook_url, Meter auto-generates a secret with a whsec_ prefix
Every webhook request includes the secret in the X-Webhook-Secret header
Your endpoint verifies the header matches your stored secret
Storing the secret
The secret is returned once when the schedule is created. Store it securely:
schedule = client.create_schedule(
strategy_id = strategy_id,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://your-app.com/webhooks/meter"
)
# Save this — it won't be shown again in other API responses
webhook_secret = schedule.get( "webhook_secret" )
print ( f "Store this secret: { webhook_secret } " )
You can also provide your own secret:
schedule = client.create_schedule(
strategy_id = strategy_id,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://your-app.com/webhooks/meter" ,
webhook_secret = "whsec_my_custom_secret_here"
)
Verifying requests
Check the X-Webhook-Secret header in your webhook handler:
FastAPI (Python)
Express (Node.js)
Flask (Python)
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
WEBHOOK_SECRET = "whsec_..." # From schedule creation
@app.post ( "/webhooks/meter" )
async def handle_webhook ( request : Request):
# Verify the secret
secret = request.headers.get( "X-Webhook-Secret" )
if secret != WEBHOOK_SECRET :
raise HTTPException( status_code = 401 , detail = "Invalid webhook secret" )
payload = await request.json()
# Process payload...
return { "status" : "ok" }
const express = require ( 'express' );
const app = express ();
app . use ( express . json ());
const WEBHOOK_SECRET = 'whsec_...' ; // From schedule creation
app . post ( '/webhooks/meter' , ( req , res ) => {
// Verify the secret
const secret = req . headers [ 'x-webhook-secret' ];
if ( secret !== WEBHOOK_SECRET ) {
return res . status ( 401 ). json ({ error: 'Invalid webhook secret' });
}
const payload = req . body ;
// Process payload...
res . status ( 200 ). json ({ status: 'ok' });
});
from flask import Flask, request, jsonify
app = Flask( __name__ )
WEBHOOK_SECRET = "whsec_..." # From schedule creation
@app.route ( '/webhooks/meter' , methods = [ 'POST' ])
def handle_webhook ():
# Verify the secret
secret = request.headers.get( 'X-Webhook-Secret' )
if secret != WEBHOOK_SECRET :
return jsonify({ "error" : "Invalid secret" }), 401
payload = request.json
# Process payload...
return jsonify({ "status" : "ok" }), 200
Regenerating secrets
If a secret is compromised, regenerate it:
result = client.regenerate_webhook_secret(schedule_id)
new_secret = result[ "webhook_secret" ]
# Update your webhook handler with the new secret
The old secret is immediately invalidated. Update your webhook handler before the next delivery.
Retry behavior
Meter automatically retries failed webhook deliveries with exponential backoff:
Retry Delay Total elapsed 1st 15 minutes 15 minutes 2nd 30 minutes 45 minutes 3rd 1 hour 1 hour 45 minutes 4th 2 hours 3 hours 45 minutes 5th (final) 4 hours 7 hours 45 minutes
Retry rules:
2xx response : Delivery successful, no retry
4xx response (client error): Delivery stops immediately — no retries. Fix your endpoint and the next scheduled job will deliver normally
5xx response (server error): Retries with backoff
Timeout (>30 seconds): Retries with backoff
Connection failure : Retries with backoff
Return 200 OK as quickly as possible. Process the payload asynchronously in a background task to avoid timeouts.
Implementation
Step 1: Create a webhook endpoint
FastAPI (Python)
Express (Node.js)
Flask (Python)
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
app = FastAPI()
WEBHOOK_SECRET = "whsec_..."
@app.post ( "/webhooks/meter" )
async def handle_meter_webhook (
request : Request,
background_tasks : BackgroundTasks
):
# Verify secret
secret = request.headers.get( "X-Webhook-Secret" )
if secret != WEBHOOK_SECRET :
raise HTTPException( status_code = 401 , detail = "Invalid secret" )
payload = await request.json()
if payload[ "status" ] == "failed" :
background_tasks.add_task(handle_failure, payload)
return { "status" : "ok" }
if payload.get( "has_changes" ):
background_tasks.add_task(process_changes, payload)
return { "status" : "ok" }
async def process_changes ( payload ):
results = payload[ "results" ]
metadata = payload.get( "metadata" , {})
# Update your database, trigger pipelines, etc.
async def handle_failure ( payload ):
# Log, alert, or retry logic
print ( f "Job failed: { payload[ 'error' ] } " )
const express = require ( 'express' );
const app = express ();
app . use ( express . json ());
const WEBHOOK_SECRET = 'whsec_...' ;
app . post ( '/webhooks/meter' , async ( req , res ) => {
// Verify secret
const secret = req . headers [ 'x-webhook-secret' ];
if ( secret !== WEBHOOK_SECRET ) {
return res . status ( 401 ). json ({ error: 'Invalid secret' });
}
const payload = req . body ;
if ( payload . status === 'failed' ) {
console . error ( `Job failed: ${ payload . error } ` );
return res . status ( 200 ). json ({ status: 'ok' });
}
if ( payload . has_changes ) {
// Process asynchronously
setImmediate (() => processChanges ( payload ));
}
res . status ( 200 ). json ({ status: 'ok' });
});
async function processChanges ( payload ) {
const results = payload . results ;
const metadata = payload . metadata || {};
// Update database, etc.
}
app . listen ( 3000 );
from flask import Flask, request, jsonify
import threading
app = Flask( __name__ )
WEBHOOK_SECRET = "whsec_..."
@app.route ( '/webhooks/meter' , methods = [ 'POST' ])
def handle_meter_webhook ():
# Verify secret
secret = request.headers.get( 'X-Webhook-Secret' )
if secret != WEBHOOK_SECRET :
return jsonify({ "error" : "Invalid secret" }), 401
payload = request.json
if payload[ "status" ] == "failed" :
print ( f "Job failed: { payload[ 'error' ] } " )
return jsonify({ "status" : "ok" }), 200
if payload.get( "has_changes" ):
# Process in background thread
threading.Thread(
target = process_changes, args = (payload,)
).start()
return jsonify({ "status" : "ok" }), 200
def process_changes ( payload ):
results = payload[ "results" ]
metadata = payload.get( "metadata" , {})
# Update database, etc.
Step 2: Make endpoint publicly accessible
Options:
Deploy to cloud (AWS Lambda, Google Cloud Functions, etc.)
Use ngrok for local development: ngrok http 3000
Use a VPS with public IP
Step 3: Create schedule with webhook URL
from meter_sdk import MeterClient
client = MeterClient( api_key = "sk_live_..." )
schedule = client.create_schedule(
strategy_id = "your-strategy-id" ,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://your-app.com/webhooks/meter" ,
webhook_metadata = { "project" : "price-monitor" }
)
# Store the auto-generated webhook secret
print ( f "Webhook secret: { schedule.get( 'webhook_secret' ) } " )
Slack webhooks
Send notifications directly to Slack channels:
schedule = client.create_schedule(
strategy_id = strategy_id,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://hooks.slack.com/services/T.../B.../xxx"
# webhook_type auto-detected as "slack"
)
Slack payloads are automatically formatted as readable messages with item counts and a preview of results.
Best practices
1. Respond quickly
Return 200 OK within 30 seconds. Process payloads asynchronously:
@app.post ( "/webhooks/meter" )
async def handle_webhook ( request : Request, background_tasks : BackgroundTasks):
payload = await request.json()
background_tasks.add_task(process_changes, payload)
return { "status" : "ok" } # Respond immediately
2. Handle duplicates
Make processing idempotent using the job_id:
processed_jobs = set ()
async def process_changes ( payload ):
job_id = payload[ 'job_id' ]
if job_id in processed_jobs:
return # Skip duplicate
processed_jobs.add(job_id)
# Process...
3. Handle failures gracefully
@app.post ( "/webhooks/meter" )
async def handle_webhook ( request : Request):
try :
payload = await request.json()
await process_changes(payload)
return { "status" : "ok" }
except Exception as e:
logger.error( f "Webhook processing failed: { e } " )
# Return 200 to prevent retries (payload is stored for manual review)
await save_failed_webhook(payload, str (e))
return { "status" : "error" , "message" : str (e)}
Testing webhooks
Local testing with ngrok
# Start your webhook server
python app.py
# In another terminal, expose with ngrok
ngrok http 3000
# Use the ngrok URL in Meter
# https://abc123.ngrok.io/webhooks/meter
Testing with webhook.site
Go to https://webhook.site
Copy the unique URL
Use it in your Meter schedule
Trigger a job and view the payload on webhook.site
Test endpoint
Use the Meter test endpoint to verify delivery:
curl -X POST https://api.meter.sh/api/webhooks/test \
-H "Authorization: Bearer sk_live_..." \
-H "Content-Type: application/json" \
-d '{"webhook_url": "https://your-app.com/webhooks/meter"}'
Manual testing
curl -X POST http://localhost:3000/webhooks/meter \
-H "Content-Type: application/json" \
-H "X-Webhook-Secret: whsec_test" \
-d '{
"job_id": "test-123",
"status": "completed",
"url": "https://example.com",
"has_changes": true,
"results": [{"test": "data"}],
"item_count": 1,
"metadata": {"project": "test"}
}'
Troubleshooting
Solutions:
Verify URL is publicly accessible
Check endpoint returns 200 OK
Test with webhook.site
Check server logs for errors
Verify the schedule has a webhook_url set
Cause: Endpoint takes >30 seconds to respondSolution: Return 200 OK immediately, process asynchronously:@app.post ( "/webhooks/meter" )
async def handle_webhook ( request : Request, background_tasks : BackgroundTasks):
payload = await request.json()
background_tasks.add_task(process_changes, payload)
return { "status" : "ok" } # Immediate response
Cause: Retries after timeout or connection issuesSolution: Make processing idempotent using job_id:processed_jobs = set ()
async def process_changes ( payload ):
job_id = payload[ 'job_id' ]
if job_id in processed_jobs:
return
processed_jobs.add(job_id)
# Process...
4xx errors stopping delivery
Cause: Your endpoint returns a 4xx status codeSolution: Meter treats 4xx as a permanent failure and does not retry. Check your endpoint for:
Invalid webhook secret (401)
Incorrect URL path (404)
Request validation errors (422)
Fix the issue and delivery will resume on the next scheduled job.
Next steps
Need help?
Email me at mckinnon@meter.sh