Skip to main content

Workflows

A workflow chains multiple scraping strategies into a directed acyclic graph (DAG), where the output of one scraper feeds into the next. This lets you build multi-step pipelines like “scrape an index page for links, then scrape each detail page.”

When to use workflows

Use workflows when:
  • You need to scrape pages discovered by a previous scrape
  • Data extraction requires multiple stages (index -> detail -> sub-detail)
  • You want to filter results between stages
  • Different pages need different extraction strategies
Use simple schedules instead when:
  • You’re scraping a single URL or static list of URLs
  • All pages use the same extraction strategy
  • There’s no dependency between scrapes

Key concepts

Nodes

Each node in a workflow represents a scraping operation using a specific strategy. Nodes have one of three input types:
Input TypeDescriptionUse Case
static_urlsFixed list of URLsStarting nodes (index pages, sitemaps)
upstream_urlsExtract URLs from a field in upstream resultsFollowing links to detail pages
upstream_dataPass full upstream results as contextParameter mapping between stages

Edges

Edges connect nodes and define data flow. Each edge goes from a source node to a target node. Edges can optionally include filters that control which upstream results are passed downstream.

Filters

Filters let you selectively pass data between nodes. For example, only follow links that contain “article” or skip items where the price is below a threshold.
from meter_sdk.workflow import Filter

# Single condition
Filter.contains("url", "/article/")

# Combine with AND
Filter.all(
    Filter.contains("category", "electronics"),
    Filter.gt("price", "100")
)

# Combine with OR
Filter.any(
    Filter.contains("url", "/sale/"),
    Filter.contains("url", "/clearance/")
)
Available filter operators:
MethodDescription
Filter.contains(field, value)Field contains substring
Filter.not_contains(field, value)Field does not contain substring
Filter.equals(field, value)Exact match
Filter.not_equals(field, value)Not exact match
Filter.regex_match(field, pattern)Regex match
Filter.exists(field)Field exists and is non-empty
Filter.not_exists(field)Field is missing or empty
Filter.gt(field, value)Greater than
Filter.lt(field, value)Less than
Filter.all(*conditions)AND — all conditions must match
Filter.any(*conditions)OR — at least one condition must match
All string operators accept an optional case_sensitive parameter (default: False).

How workflows execute

  1. Root nodes execute first using their static URLs
  2. Results flow through edges, optionally filtered
  3. Downstream nodes receive URLs or data from upstream
  4. This continues until all leaf nodes complete
  5. Final results are collected from leaf nodes, grouped by URL

Building workflows

Basic chain (A -> B)

Scrape an index page, then follow each link to a detail page:
from meter_sdk import MeterClient
from meter_sdk.workflow import Workflow

client = MeterClient(api_key="sk_live_...")

# Build the workflow
workflow = Workflow("Job Scraper")

# Start node: scrape the index page
index = workflow.start("index", index_strategy_id, urls=["https://jobs.com/listings"])

# Chain: scrape each job's detail page
details = index.then("details", detail_strategy_id, url_field="job_url")

# Run it
run = client.run_workflow(workflow)

# Get results (grouped by URL, then by strategy)
output = client.get_workflow_output(run["workflow_id"])
for url, strategies in output["final_results_by_url_grouped"].items():
    for strategy, items in strategies.items():
        print(f"{strategy}: {len(items)} items")

Fan-out (A -> B, C, D)

One source feeding multiple downstream scrapers:
workflow = Workflow("Multi-Extractor")

index = workflow.start("index", index_strategy_id, urls=["https://shop.com"])

# Fan out to different detail strategies
prices = index.then("prices", price_strategy_id, url_field="product_url")
reviews = index.then("reviews", review_strategy_id, url_field="product_url")
images = index.then("images", image_strategy_id, url_field="product_url")

run = client.run_workflow(workflow)
output = client.get_workflow_output(run["workflow_id"])

Filtered pipeline

Only follow links that match a condition:
from meter_sdk.workflow import Workflow, Filter

workflow = Workflow("News Pipeline")

index = workflow.start("index", index_strategy_id, urls=["https://news.com"])

# Only scrape articles in the technology section
tech_articles = index.then(
    "tech_articles",
    article_strategy_id,
    url_field="link",
    filter=Filter.contains("category", "technology")
)

run = client.run_workflow(workflow)
output = client.get_workflow_output(run["workflow_id"])

Multi-stage chain (A -> B -> C)

workflow = Workflow("Deep Scraper")

sitemap = workflow.start("sitemap", sitemap_strategy_id, urls=["https://shop.com/sitemap"])
categories = sitemap.then("categories", category_strategy_id, url_field="category_url")
products = categories.then("products", product_strategy_id, url_field="product_url")

run = client.run_workflow(workflow)
output = client.get_workflow_output(run["workflow_id"])

Change detection

Workflows support change detection through two mechanisms:
  • trigger_on_change_only: When set on an edge, downstream nodes only execute if the upstream results have changed since the last run
  • force: When running a workflow with force=True, change detection is skipped and all nodes re-execute
# Normal run — uses change detection
result = client.run_workflow(workflow_id)

# Force re-run — skip change detection
result = client.run_workflow(workflow_id, force=True)

Scheduling workflows

Workflows can be scheduled to run automatically, just like single-strategy schedules:
# Run every hour
client.schedule_workflow(
    workflow_id,
    interval_seconds=3600
)

# Run daily at 9 AM with webhook
client.schedule_workflow(
    workflow_id,
    cron_expression="0 9 * * *",
    webhook_url="https://your-app.com/webhook"
)
See Workflow SDK Reference for all scheduling methods.

Parameters

Nodes can pass parameters to their strategies:
# Static parameters on a start node
index = workflow.start(
    "index",
    api_strategy_id,
    urls=["https://api.example.com/items"],
    parameters={"page": 1, "limit": 100}
)

# Parameter config for downstream nodes (map upstream fields to parameters)
details = index.then(
    "details",
    detail_strategy_id,
    url_field="detail_url",
    parameter_config={"item_id": "$.id"}
)

Next steps

Need help?

Email me at mckinnon@meter.sh