Pull-Based Monitoring Guide
Learn how to use Meter’s changes API to poll for updates on your own schedule, giving you control over when and how changes are processed.
Overview
Pull-based monitoring uses the get_schedule_changes() API to check for changes on demand, rather than receiving webhooks.
Use pull-based when:
- Batch processing changes (e.g., once per hour)
- Webhooks aren’t feasible (firewall restrictions, no public endpoint)
- You need manual control over processing timing
- Building admin dashboards or reporting tools
Use webhooks instead when:
- Changes need immediate action
- Building real-time systems
How it works
Basic implementation
from meter_sdk import MeterClient
import time
client = MeterClient(api_key="sk_live_...")
# TODO: Replace with your schedule ID
schedule_id = "your-schedule-id"
while True:
# Check for changes
changes = client.get_schedule_changes(
schedule_id=schedule_id,
mark_seen=True # Mark as seen after reading
)
if changes['count'] > 0:
print(f"Processing {changes['count']} changes")
for change in changes['changes']:
# TODO: Add your processing logic
print(f"Job {change['job_id']}: {change['item_count']} items")
# Process change['results']...
else:
print("No changes detected")
# Wait before next check
time.sleep(3600) # Check every hour
REST API implementation
Use the REST API directly if you’re not using Python or prefer HTTP calls.
Endpoint
GET /v1/schedules/{schedule_id}/changes?mark_seen=true
Basic example (curl)
#!/bin/bash
SCHEDULE_ID="your-schedule-id"
API_KEY="sk_live_..."
while true; do
echo "Checking for changes..."
# Get changes
response=$(curl -s https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes \
-H "Authorization: Bearer $API_KEY")
# Parse count (requires jq)
count=$(echo $response | jq -r '.count')
if [ "$count" -gt 0 ]; then
echo "Processing $count changes"
echo $response | jq '.changes'
# TODO: Process changes
else
echo "No changes detected"
fi
# Wait before next check
sleep 3600 # 1 hour
done
JavaScript/Node.js example
const SCHEDULE_ID = 'your-schedule-id';
const API_KEY = 'sk_live_...';
async function checkForChanges() {
const response = await fetch(
`https://api.meter.sh/v1/schedules/${SCHEDULE_ID}/changes`,
{
headers: {
'Authorization': `Bearer ${API_KEY}`
}
}
);
const data = await response.json();
if (data.count > 0) {
console.log(`Processing ${data.count} changes`);
for (const change of data.changes) {
console.log(`Job ${change.job_id}: ${change.item_count} items`);
// TODO: Process change.results
await processChange(change);
}
} else {
console.log('No changes detected');
}
}
// Poll every hour
setInterval(checkForChanges, 3600000);
// Run immediately on start
checkForChanges();
Python (requests) example
import requests
import time
SCHEDULE_ID = "your-schedule-id"
API_KEY = "sk_live_..."
def check_for_changes():
response = requests.get(
f"https://api.meter.sh/v1/schedules/{SCHEDULE_ID}/changes",
headers={"Authorization": f"Bearer {API_KEY}"}
)
data = response.json()
if data['count'] > 0:
print(f"Processing {data['count']} changes")
for change in data['changes']:
print(f"Job {change['job_id']}: {change['item_count']} items")
# TODO: Process change['results']
process_change(change)
else:
print("No changes detected")
# Poll every hour
while True:
check_for_changes()
time.sleep(3600)
Go example
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
const (
scheduleID = "your-schedule-id"
apiKey = "sk_live_..."
)
type ChangesResponse struct {
ScheduleID string `json:"schedule_id"`
Changes []Change `json:"changes"`
Count int `json:"count"`
MarkedSeen bool `json:"marked_seen"`
}
type Change struct {
JobID string `json:"job_id"`
Status string `json:"status"`
Results []interface{} `json:"results"`
ItemCount int `json:"item_count"`
ContentHash string `json:"content_hash"`
CompletedAt string `json:"completed_at"`
Seen bool `json:"seen"`
}
func checkForChanges() error {
url := fmt.Sprintf("https://api.meter.sh/v1/schedules/%s/changes", scheduleID)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+apiKey)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
var changesResp ChangesResponse
if err := json.Unmarshal(body, &changesResp); err != nil {
return err
}
if changesResp.Count > 0 {
fmt.Printf("Processing %d changes\n", changesResp.Count)
for _, change := range changesResp.Changes {
fmt.Printf("Job %s: %d items\n", change.JobID, change.ItemCount)
// TODO: Process change.Results
}
} else {
fmt.Println("No changes detected")
}
return nil
}
func main() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
// Run immediately
checkForChanges()
// Then poll every hour
for range ticker.C {
if err := checkForChanges(); err != nil {
fmt.Printf("Error checking for changes: %v\n", err)
}
}
}
mark_seen parameter
Control whether changes are marked as seen:
# Mark as seen (default)
curl https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes \
-H "Authorization: Bearer $API_KEY"
# Preview without marking as seen
curl "https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes?mark_seen=false" \
-H "Authorization: Bearer $API_KEY"
// Preview without marking as seen
const response = await fetch(
`https://api.meter.sh/v1/schedules/${SCHEDULE_ID}/changes?mark_seen=false`,
{ headers: { 'Authorization': `Bearer ${API_KEY}` } }
);
filter parameter
Filter results by keywords using Lucene-style syntax:
# Filter for items containing both "jfk" AND "tariff"
curl "https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes?filter=%2Bjfk+%2Btariff" \
-H "Authorization: Bearer $API_KEY"
# Filter for items containing "jfk" OR "elon"
curl "https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes?filter=jfk+elon" \
-H "Authorization: Bearer $API_KEY"
# Filter for articles about specific topics
changes = client.get_schedule_changes(
schedule_id=schedule_id,
filter="+jfk +tariff", # Both required
mark_seen=True
)
| Syntax | Meaning | Example |
|---|
+keyword | Required (AND) | +jfk +tariff |
keyword | Optional (OR) | jfk elon |
-keyword | Excluded (NOT) | -bitcoin |
"phrase" | Exact phrase | "elon musk" |
Filters apply to individual result items, not entire jobs.
Only matching items are returned. Jobs with zero matches are excluded.
{
"schedule_id": "880e8400-e29b-41d4-a716-446655440000",
"changes": [
{
"job_id": "660e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"results": [
{
"title": "Product Name",
"price": "$29.99",
"url": "https://example.com/product/123"
}
],
"item_count": 12,
"content_hash": "7f3d9a2b4c1e...",
"completed_at": "2025-01-15T10:30:12Z",
"seen": true
}
],
"count": 1,
"marked_seen": true,
"filter_applied": "+jfk +tariff"
}
Key concepts
mark_seen parameter
# Mark changes as seen (default)
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
# These changes won't be returned in future calls
# Preview without marking as seen
preview = client.get_schedule_changes(schedule_id, mark_seen=False)
# These changes will be returned again next time
Use mark_seen=False to:
- Preview changes before processing
- Debug change detection
- Implement custom “seen” tracking
Changes API returns only new changes
The API only returns jobs with content changes that haven’t been marked as seen:
# First call
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
print(changes['count']) # e.g., 3 changes
# Second call (immediately after)
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
print(changes['count']) # 0 (all marked as seen)
# After new scrape runs and detects changes
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
print(changes['count']) # New changes only
Polling patterns
Fixed interval polling
import time
def poll_fixed_interval(schedule_id, interval_seconds):
"""Poll at fixed intervals"""
while True:
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
if changes['count'] > 0:
# TODO: Process changes
process_changes(changes['changes'])
time.sleep(interval_seconds)
# Poll every hour
poll_fixed_interval(schedule_id, 3600)
Cron-based polling
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('cron', hour=9) # Daily at 9 AM
def check_changes():
"""Check for changes on a schedule"""
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
if changes['count'] > 0:
# TODO: Process changes
process_changes(changes['changes'])
scheduler.start()
Batch processing
def batch_process_changes(schedule_ids, batch_size=10):
"""Process changes for multiple schedules"""
all_changes = []
# Collect changes from all schedules
for schedule_id in schedule_ids:
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
all_changes.extend(changes['changes'])
if len(all_changes) == 0:
print("No changes across all schedules")
return
# Process in batches
for i in range(0, len(all_changes), batch_size):
batch = all_changes[i:i + batch_size]
# TODO: Process batch
print(f"Processing batch {i//batch_size + 1}: {len(batch)} changes")
process_batch(batch)
Advanced patterns
Exponential backoff on errors
import time
def poll_with_backoff(schedule_id, max_retries=5):
"""Poll with exponential backoff on errors"""
retry_count = 0
while True:
try:
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
if changes['count'] > 0:
process_changes(changes['changes'])
retry_count = 0 # Reset on success
time.sleep(3600) # Normal interval
except Exception as e:
retry_count += 1
if retry_count > max_retries:
print("Max retries exceeded")
raise
wait_time = 2 ** retry_count
print(f"Error: {e}. Retrying in {wait_time}s")
time.sleep(wait_time)
Track processing status
import json
from datetime import datetime
def poll_with_tracking(schedule_id, state_file="processing_state.json"):
"""Poll and track processing status"""
# Load state
try:
with open(state_file, 'r') as f:
state = json.load(f)
except FileNotFoundError:
state = {"last_check": None, "processed_jobs": []}
# Check for changes
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
if changes['count'] > 0:
for change in changes['changes']:
# TODO: Process change
process_change(change)
# Track processed job
state["processed_jobs"].append({
"job_id": change['job_id'],
"processed_at": datetime.now().isoformat()
})
# Update state
state["last_check"] = datetime.now().isoformat()
with open(state_file, 'w') as f:
json.dump(state, f, indent=2)
Preview before processing
def preview_and_confirm(schedule_id):
"""Preview changes and ask for confirmation"""
# Preview without marking as seen
preview = client.get_schedule_changes(schedule_id, mark_seen=False)
if preview['count'] == 0:
print("No changes to process")
return
# Show preview
print(f"\nFound {preview['count']} changes:")
for i, change in enumerate(preview['changes'], 1):
print(f"{i}. Job {change['job_id']}: {change['item_count']} items")
# Confirm
response = input("\nProcess these changes? (y/n): ")
if response.lower() == 'y':
# Now mark as seen and process
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
process_changes(changes['changes'])
else:
print("Skipped processing")
Best practices
1. Match polling interval to schedule frequency
# If schedule runs every hour, poll every hour or less frequently
schedule = client.create_schedule(
strategy_id=strategy_id,
url=url,
interval_seconds=3600 # Every hour
)
# Poll slightly less frequently to batch changes
while True:
changes = client.get_schedule_changes(schedule['id'], mark_seen=True)
# Process...
time.sleep(3900) # 65 minutes
2. Handle empty results gracefully
def process_changes_safe(schedule_id):
"""Handle empty results without errors"""
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
if changes['count'] == 0:
# No changes is normal - don't treat as error
return
# Process changes...
3. Log polling activity
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def poll_with_logging(schedule_id):
"""Poll with detailed logging"""
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
logger.info(
f"Checked schedule {schedule_id}: {changes['count']} changes",
extra={"schedule_id": schedule_id, "change_count": changes['count']}
)
for change in changes['changes']:
logger.info(
f"Processing job {change['job_id']}",
extra={"job_id": change['job_id'], "item_count": change['item_count']}
)
Complete example
TODO: Add complete working example
See Example: News Aggregation for a full implementation with pull-based monitoring.
Comparison: Pull vs. Webhooks
| Feature | Pull-Based | Webhooks |
|---|
| Latency | Polling interval | Real-time |
| Control | You control timing | Meter triggers |
| Setup | No public endpoint needed | Requires public endpoint |
| Batching | Easy to batch | Requires queuing |
| Firewall | Works behind firewall | Requires inbound access |
| Best for | Batch processing, admin tools | Real-time systems, automation |
Next steps
Need help?
Email me at [email protected]