Skip to main content

Pull-Based Monitoring Guide

Learn how to use Meter’s changes API to poll for updates on your own schedule, giving you control over when and how changes are processed.

Overview

Pull-based monitoring uses the get_schedule_changes() API to check for changes on demand, rather than receiving webhooks. Use pull-based when:
  • Batch processing changes (e.g., once per hour)
  • Webhooks aren’t feasible (firewall restrictions, no public endpoint)
  • You need manual control over processing timing
  • Building admin dashboards or reporting tools
Use webhooks instead when:
  • Changes need immediate action
  • Building real-time systems

How it works

Basic implementation

from meter_sdk import MeterClient
import time

client = MeterClient(api_key="sk_live_...")

# TODO: Replace with your schedule ID
schedule_id = "your-schedule-id"

while True:
    # Check for changes
    changes = client.get_schedule_changes(
        schedule_id=schedule_id,
        mark_seen=True  # Mark as seen after reading
    )

    if changes['count'] > 0:
        print(f"Processing {changes['count']} changes")

        for change in changes['changes']:
            # TODO: Add your processing logic
            print(f"Job {change['job_id']}: {change['item_count']} items")
            # Process change['results']...

    else:
        print("No changes detected")

    # Wait before next check
    time.sleep(3600)  # Check every hour

REST API implementation

Use the REST API directly if you’re not using Python or prefer HTTP calls.

Endpoint

GET /v1/schedules/{schedule_id}/changes?mark_seen=true

Basic example (curl)

#!/bin/bash

SCHEDULE_ID="your-schedule-id"
API_KEY="sk_live_..."

while true; do
  echo "Checking for changes..."

  # Get changes
  response=$(curl -s https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes \
    -H "Authorization: Bearer $API_KEY")

  # Parse count (requires jq)
  count=$(echo $response | jq -r '.count')

  if [ "$count" -gt 0 ]; then
    echo "Processing $count changes"
    echo $response | jq '.changes'
    # TODO: Process changes
  else
    echo "No changes detected"
  fi

  # Wait before next check
  sleep 3600  # 1 hour
done

JavaScript/Node.js example

const SCHEDULE_ID = 'your-schedule-id';
const API_KEY = 'sk_live_...';

async function checkForChanges() {
  const response = await fetch(
    `https://api.meter.sh/v1/schedules/${SCHEDULE_ID}/changes`,
    {
      headers: {
        'Authorization': `Bearer ${API_KEY}`
      }
    }
  );

  const data = await response.json();

  if (data.count > 0) {
    console.log(`Processing ${data.count} changes`);

    for (const change of data.changes) {
      console.log(`Job ${change.job_id}: ${change.item_count} items`);
      // TODO: Process change.results
      await processChange(change);
    }
  } else {
    console.log('No changes detected');
  }
}

// Poll every hour
setInterval(checkForChanges, 3600000);

// Run immediately on start
checkForChanges();

Python (requests) example

import requests
import time

SCHEDULE_ID = "your-schedule-id"
API_KEY = "sk_live_..."

def check_for_changes():
    response = requests.get(
        f"https://api.meter.sh/v1/schedules/{SCHEDULE_ID}/changes",
        headers={"Authorization": f"Bearer {API_KEY}"}
    )

    data = response.json()

    if data['count'] > 0:
        print(f"Processing {data['count']} changes")

        for change in data['changes']:
            print(f"Job {change['job_id']}: {change['item_count']} items")
            # TODO: Process change['results']
            process_change(change)
    else:
        print("No changes detected")

# Poll every hour
while True:
    check_for_changes()
    time.sleep(3600)

Go example

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "time"
)

const (
    scheduleID = "your-schedule-id"
    apiKey     = "sk_live_..."
)

type ChangesResponse struct {
    ScheduleID  string   `json:"schedule_id"`
    Changes     []Change `json:"changes"`
    Count       int      `json:"count"`
    MarkedSeen  bool     `json:"marked_seen"`
}

type Change struct {
    JobID       string        `json:"job_id"`
    Status      string        `json:"status"`
    Results     []interface{} `json:"results"`
    ItemCount   int           `json:"item_count"`
    ContentHash string        `json:"content_hash"`
    CompletedAt string        `json:"completed_at"`
    Seen        bool          `json:"seen"`
}

func checkForChanges() error {
    url := fmt.Sprintf("https://api.meter.sh/v1/schedules/%s/changes", scheduleID)

    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return err
    }

    req.Header.Set("Authorization", "Bearer "+apiKey)

    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return err
    }

    var changesResp ChangesResponse
    if err := json.Unmarshal(body, &changesResp); err != nil {
        return err
    }

    if changesResp.Count > 0 {
        fmt.Printf("Processing %d changes\n", changesResp.Count)

        for _, change := range changesResp.Changes {
            fmt.Printf("Job %s: %d items\n", change.JobID, change.ItemCount)
            // TODO: Process change.Results
        }
    } else {
        fmt.Println("No changes detected")
    }

    return nil
}

func main() {
    ticker := time.NewTicker(1 * time.Hour)
    defer ticker.Stop()

    // Run immediately
    checkForChanges()

    // Then poll every hour
    for range ticker.C {
        if err := checkForChanges(); err != nil {
            fmt.Printf("Error checking for changes: %v\n", err)
        }
    }
}

mark_seen parameter

Control whether changes are marked as seen:
# Mark as seen (default)
curl https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes \
  -H "Authorization: Bearer $API_KEY"

# Preview without marking as seen
curl "https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes?mark_seen=false" \
  -H "Authorization: Bearer $API_KEY"
// Preview without marking as seen
const response = await fetch(
  `https://api.meter.sh/v1/schedules/${SCHEDULE_ID}/changes?mark_seen=false`,
  { headers: { 'Authorization': `Bearer ${API_KEY}` } }
);

filter parameter

Filter results by keywords using Lucene-style syntax:
# Filter for items containing both "jfk" AND "tariff"
curl "https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes?filter=%2Bjfk+%2Btariff" \
  -H "Authorization: Bearer $API_KEY"

# Filter for items containing "jfk" OR "elon"
curl "https://api.meter.sh/v1/schedules/$SCHEDULE_ID/changes?filter=jfk+elon" \
  -H "Authorization: Bearer $API_KEY"
# Filter for articles about specific topics
changes = client.get_schedule_changes(
    schedule_id=schedule_id,
    filter="+jfk +tariff",  # Both required
    mark_seen=True
)
SyntaxMeaningExample
+keywordRequired (AND)+jfk +tariff
keywordOptional (OR)jfk elon
-keywordExcluded (NOT)-bitcoin
"phrase"Exact phrase"elon musk"
Filters apply to individual result items, not entire jobs. Only matching items are returned. Jobs with zero matches are excluded.

Response format

{
  "schedule_id": "880e8400-e29b-41d4-a716-446655440000",
  "changes": [
    {
      "job_id": "660e8400-e29b-41d4-a716-446655440000",
      "status": "completed",
      "results": [
        {
          "title": "Product Name",
          "price": "$29.99",
          "url": "https://example.com/product/123"
        }
      ],
      "item_count": 12,
      "content_hash": "7f3d9a2b4c1e...",
      "completed_at": "2025-01-15T10:30:12Z",
      "seen": true
    }
  ],
  "count": 1,
  "marked_seen": true,
  "filter_applied": "+jfk +tariff"
}

Key concepts

mark_seen parameter

# Mark changes as seen (default)
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
# These changes won't be returned in future calls

# Preview without marking as seen
preview = client.get_schedule_changes(schedule_id, mark_seen=False)
# These changes will be returned again next time
Use mark_seen=False to:
  • Preview changes before processing
  • Debug change detection
  • Implement custom “seen” tracking

Changes API returns only new changes

The API only returns jobs with content changes that haven’t been marked as seen:
# First call
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
print(changes['count'])  # e.g., 3 changes

# Second call (immediately after)
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
print(changes['count'])  # 0 (all marked as seen)

# After new scrape runs and detects changes
changes = client.get_schedule_changes(schedule_id, mark_seen=True)
print(changes['count'])  # New changes only

Polling patterns

Fixed interval polling

import time

def poll_fixed_interval(schedule_id, interval_seconds):
    """Poll at fixed intervals"""
    while True:
        changes = client.get_schedule_changes(schedule_id, mark_seen=True)

        if changes['count'] > 0:
            # TODO: Process changes
            process_changes(changes['changes'])

        time.sleep(interval_seconds)

# Poll every hour
poll_fixed_interval(schedule_id, 3600)

Cron-based polling

from apscheduler.schedulers.blocking import BlockingScheduler

scheduler = BlockingScheduler()

@scheduler.scheduled_job('cron', hour=9)  # Daily at 9 AM
def check_changes():
    """Check for changes on a schedule"""
    changes = client.get_schedule_changes(schedule_id, mark_seen=True)

    if changes['count'] > 0:
        # TODO: Process changes
        process_changes(changes['changes'])

scheduler.start()

Batch processing

def batch_process_changes(schedule_ids, batch_size=10):
    """Process changes for multiple schedules"""
    all_changes = []

    # Collect changes from all schedules
    for schedule_id in schedule_ids:
        changes = client.get_schedule_changes(schedule_id, mark_seen=True)
        all_changes.extend(changes['changes'])

    if len(all_changes) == 0:
        print("No changes across all schedules")
        return

    # Process in batches
    for i in range(0, len(all_changes), batch_size):
        batch = all_changes[i:i + batch_size]
        # TODO: Process batch
        print(f"Processing batch {i//batch_size + 1}: {len(batch)} changes")
        process_batch(batch)

Advanced patterns

Exponential backoff on errors

import time

def poll_with_backoff(schedule_id, max_retries=5):
    """Poll with exponential backoff on errors"""
    retry_count = 0

    while True:
        try:
            changes = client.get_schedule_changes(schedule_id, mark_seen=True)

            if changes['count'] > 0:
                process_changes(changes['changes'])

            retry_count = 0  # Reset on success
            time.sleep(3600)  # Normal interval

        except Exception as e:
            retry_count += 1
            if retry_count > max_retries:
                print("Max retries exceeded")
                raise

            wait_time = 2 ** retry_count
            print(f"Error: {e}. Retrying in {wait_time}s")
            time.sleep(wait_time)

Track processing status

import json
from datetime import datetime

def poll_with_tracking(schedule_id, state_file="processing_state.json"):
    """Poll and track processing status"""
    # Load state
    try:
        with open(state_file, 'r') as f:
            state = json.load(f)
    except FileNotFoundError:
        state = {"last_check": None, "processed_jobs": []}

    # Check for changes
    changes = client.get_schedule_changes(schedule_id, mark_seen=True)

    if changes['count'] > 0:
        for change in changes['changes']:
            # TODO: Process change
            process_change(change)

            # Track processed job
            state["processed_jobs"].append({
                "job_id": change['job_id'],
                "processed_at": datetime.now().isoformat()
            })

    # Update state
    state["last_check"] = datetime.now().isoformat()

    with open(state_file, 'w') as f:
        json.dump(state, f, indent=2)

Preview before processing

def preview_and_confirm(schedule_id):
    """Preview changes and ask for confirmation"""
    # Preview without marking as seen
    preview = client.get_schedule_changes(schedule_id, mark_seen=False)

    if preview['count'] == 0:
        print("No changes to process")
        return

    # Show preview
    print(f"\nFound {preview['count']} changes:")
    for i, change in enumerate(preview['changes'], 1):
        print(f"{i}. Job {change['job_id']}: {change['item_count']} items")

    # Confirm
    response = input("\nProcess these changes? (y/n): ")

    if response.lower() == 'y':
        # Now mark as seen and process
        changes = client.get_schedule_changes(schedule_id, mark_seen=True)
        process_changes(changes['changes'])
    else:
        print("Skipped processing")

Best practices

1. Match polling interval to schedule frequency

# If schedule runs every hour, poll every hour or less frequently
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url=url,
    interval_seconds=3600  # Every hour
)

# Poll slightly less frequently to batch changes
while True:
    changes = client.get_schedule_changes(schedule['id'], mark_seen=True)
    # Process...
    time.sleep(3900)  # 65 minutes

2. Handle empty results gracefully

def process_changes_safe(schedule_id):
    """Handle empty results without errors"""
    changes = client.get_schedule_changes(schedule_id, mark_seen=True)

    if changes['count'] == 0:
        # No changes is normal - don't treat as error
        return

    # Process changes...

3. Log polling activity

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def poll_with_logging(schedule_id):
    """Poll with detailed logging"""
    changes = client.get_schedule_changes(schedule_id, mark_seen=True)

    logger.info(
        f"Checked schedule {schedule_id}: {changes['count']} changes",
        extra={"schedule_id": schedule_id, "change_count": changes['count']}
    )

    for change in changes['changes']:
        logger.info(
            f"Processing job {change['job_id']}",
            extra={"job_id": change['job_id'], "item_count": change['item_count']}
        )

Complete example

TODO: Add complete working example See Example: News Aggregation for a full implementation with pull-based monitoring.

Comparison: Pull vs. Webhooks

FeaturePull-BasedWebhooks
LatencyPolling intervalReal-time
ControlYou control timingMeter triggers
SetupNo public endpoint neededRequires public endpoint
BatchingEasy to batchRequires queuing
FirewallWorks behind firewallRequires inbound access
Best forBatch processing, admin toolsReal-time systems, automation

Next steps

Need help?

Email me at [email protected]