Skip to main content

Jobs

A job is a single execution of a scrape using a strategy. Jobs run asynchronously in the background, extract data according to the strategy’s rules, and store the results for retrieval.

What is a job?

When you create a job, Meter:
  1. Fetches the target URL
  2. Applies the strategy’s extraction logic (CSS selectors)
  3. Extracts structured data
  4. Generates content signatures for change detection
  5. Stores results for retrieval
Jobs are the execution layer—strategies define what to extract, jobs execute the extraction.

Job lifecycle

Job statuses:
  • pending: Job is queued, waiting to start
  • running: Job is currently executing
  • completed: Job finished successfully, results available
  • failed: Job encountered an error

Creating jobs

Basic job creation

from meter_sdk import MeterClient

client = MeterClient(api_key="sk_live_...")

# Create a job
job = client.create_job(
    strategy_id="your-strategy-uuid",
    url="https://example.com/page"
)

print(f"Job ID: {job['job_id']}")
print(f"Status: {job['status']}")  # Usually 'pending'

Waiting for completion

Jobs run asynchronously. Use wait_for_job() to poll until completion:
# Wait indefinitely (polls every 1 second by default)
completed_job = client.wait_for_job(job['job_id'])

print(f"Status: {completed_job['status']}")  # 'completed'
print(f"Items extracted: {completed_job['item_count']}")

# Access results
for item in completed_job['results']:
    print(item)

With timeout

Set a timeout to avoid waiting forever:
from meter_sdk import MeterError

try:
    completed_job = client.wait_for_job(
        job['job_id'],
        poll_interval=2.0,  # Check every 2 seconds
        timeout=300.0  # 5 minute timeout
    )
except MeterError as e:
    print(f"Job timed out or failed: {e}")

Checking job status

Poll job status manually:
job = client.get_job(job_id)

print(f"Status: {job['status']}")
print(f"Items: {job['item_count']}")

if job['status'] == 'completed':
    results = job['results']
    print(f"Extracted {len(results)} items")
elif job['status'] == 'failed':
    print(f"Error: {job['error']}")

Job results

Completed jobs contain extracted data in the results field:
job = client.get_job(job_id)

if job['status'] == 'completed':
    for item in job['results']:
        print(item)

# Output example:
# {'title': 'Product A', 'price': '$19.99', 'image': 'https://...'}
# {'title': 'Product B', 'price': '$29.99', 'image': 'https://...'}

Job metadata

Jobs also include metadata for change detection:
job = client.get_job(job_id)

print(f"Content hash: {job['content_hash']}")
print(f"Structural signature: {job['structural_signature']}")
print(f"Item count: {job['item_count']}")
print(f"Started: {job['started_at']}")
print(f"Completed: {job['completed_at']}")
  • content_hash: Hash of the extracted content for quick comparison
  • structural_signature: Structural fingerprint for detecting layout changes
  • item_count: Number of items extracted

Listing jobs

All jobs

# Get recent jobs (newest first)
jobs = client.list_jobs(limit=20, offset=0)

for job in jobs:
    print(f"Job {job['id']}: {job['status']}")

Filter by strategy

# Get jobs for a specific strategy
jobs = client.list_jobs(
    strategy_id="your-strategy-uuid",
    limit=50
)

Filter by status

# Get only completed jobs
completed = client.list_jobs(status="completed", limit=10)

# Get failed jobs to investigate errors
failed = client.list_jobs(status="failed", limit=10)

Comparing jobs

Compare two jobs to detect changes:
comparison = client.compare_jobs(job_id_1, job_id_2)

print(f"Content hash match: {comparison['content_hash_match']}")
print(f"Structural match: {comparison['structural_match']}")
print(f"Semantic similarity: {comparison['semantic_similarity']}")

if not comparison['content_hash_match']:
    print("Content has changed!")
    if comparison['changes']:
        print(f"Detected changes:")
        for change in comparison['changes']:
            print(f"  - {change}")
Use job comparison to build custom change detection logic beyond what’s provided by schedules.

Strategy history

Get a timeline of all jobs for a strategy:
history = client.get_strategy_history(strategy_id)

for entry in history:
    print(f"Job {entry['job_id']} ({entry['created_at']}):")
    print(f"  Status: {entry['status']}")
    print(f"  Items: {entry['item_count']}")
    print(f"  Has changes: {entry['has_changes']}")
The has_changes field indicates if content changed compared to the previous job.

Advanced features

The following features require feature gating. Contact [email protected] to request access.

Antibot bypass

Meter can handle antibot protection on pages that use common bot detection systems. When enabled for your account, jobs automatically attempt to bypass antibot measures when fetching pages. This is useful for scraping sites that use:
  • Cloudflare Bot Management
  • PerimeterX
  • DataDome
  • Other common antibot solutions
No code changes are required—antibot handling is applied automatically when enabled for your account.

LLM summary

Jobs can include an LLM-generated summary of the page content. This is useful for:
  • Quick content overviews without parsing full results
  • Change detection at a semantic level
  • Building RAG pipelines with scraped content
When enabled, completed jobs include a summary field with the AI-generated summary of the extracted content.
job = client.get_job(job_id)

if job['status'] == 'completed':
    print(f"Summary: {job.get('summary')}")
    print(f"Results: {job['results']}")

Best practices

Jobs can fail if websites are down, block requests, or change structure:
job = client.get_job(job_id)

if job['status'] == 'failed':
    error = job['error']
    print(f"Job failed: {error}")

    # Implement retry logic
    if "timeout" in error.lower():
        # Retry with same strategy
        retry_job = client.create_job(strategy_id, url)
    elif "selector" in error.lower():
        # Website structure changed, regenerate strategy
        new_strategy = client.generate_strategy(url, description, name)
Different sites have different response times:
# Fast sites
client.wait_for_job(job_id, timeout=60)

# Slow sites or large pages
client.wait_for_job(job_id, timeout=300)
Jobs are stored indefinitely (during beta). For large-scale monitoring:
# Keep only recent jobs, delete old ones
old_jobs = client.list_jobs(
    strategy_id=strategy_id,
    limit=100,
    offset=50  # Skip 50 most recent
)

for job in old_jobs:
    # Delete if older than 30 days
    if should_delete(job['created_at']):
        client.delete_job(job['id'])
Job deletion is not yet implemented but is planned.
For one-off scrapes, wait_for_job() is convenient:
job = client.create_job(strategy_id, url)
results = client.wait_for_job(job['job_id'])['results']
For monitoring, use schedules instead of manually creating jobs.

Troubleshooting

Possible causes:
  • High server load
  • Job queue backlog
Solutions:
  • Wait longer (jobs typically complete in 10-60 seconds)
  • Check status manually: client.get_job(job_id)
  • Contact support if stuck for >5 minutes
Cause: Website HTML structure changedSolution: Generate a new strategy:
new_strategy = client.generate_strategy(
    url=url,
    description=description,
    name=f"{old_name} (Updated)"
)
Problem: Job completes but results is emptyPossible causes:
  • Strategy selectors don’t match the page
  • Page content is dynamically loaded (JavaScript)
Solutions:
  • Regenerate strategy for the current page structure
  • For JS-heavy sites, contact support (browser automation coming soon)

Next steps

Need help?

Email me at [email protected]