Skip to main content

Workflow Methods

Workflow methods allow you to build multi-step scraping pipelines where the output of one scraper feeds into the next. See the Workflows concept page for an introduction.

Quick reference

Workflow classes

ClassDescription
WorkflowDefine a DAG pipeline with nodes and edges
WorkflowNodeA single scraping step in the workflow
FilterBuild filter conditions for edges

Client methods

MethodDescription
create_workflow()Create a workflow from a Workflow object
run_workflow()Run a workflow (create + run, or run existing)
wait_for_workflow()Poll a workflow run until completion
get_workflow()Get workflow details
list_workflows()List all workflows
delete_workflow()Delete a workflow
get_workflow_run()Get details of a specific run
list_workflow_runs()List run history
get_workflow_output()Get latest run’s results
schedule_workflow()Schedule a workflow on interval or cron
list_workflow_schedules()List schedules for a workflow
update_workflow_schedule()Update a workflow schedule
delete_workflow_schedule()Delete a workflow schedule

Workflow class

from meter_sdk.workflow import Workflow

Constructor

Workflow(name: str, description: Optional[str] = None)
ParameterTypeRequiredDescription
namestrYesWorkflow name
descriptionstrNoDescription of what the workflow does

start()

Create the starting node with static URLs.
workflow.start(
    name: str,
    strategy_id: str,
    urls: List[str],
    parameters: Optional[Dict[str, Any]] = None
) -> WorkflowNode
ParameterTypeRequiredDescription
namestrYesUnique node identifier within the workflow
strategy_idstrYesStrategy UUID to use for scraping
urlsList[str]YesList of URLs to scrape
parametersDictNoStatic API parameter overrides
Returns: WorkflowNode — chain with .then() for downstream nodes
workflow = Workflow("Job Scraper")
index = workflow.start("index", strategy_id, urls=["https://jobs.com"])

to_dict()

Serialize the workflow for API requests. You typically don’t need to call this directly — create_workflow() and run_workflow() handle serialization.
workflow.to_dict() -> Dict[str, Any]

WorkflowNode class

then()

Chain a downstream node that receives data from this node.
node.then(
    name: str,
    strategy_id: str,
    url_field: Optional[str] = None,
    parameter_config: Optional[Dict[str, Any]] = None,
    parameters: Optional[Dict[str, Any]] = None,
    filter: Optional[Dict[str, Any]] = None
) -> WorkflowNode
ParameterTypeRequiredDescription
namestrYesUnique node identifier
strategy_idstrYesStrategy UUID for the downstream scraper
url_fieldstrNoField name in upstream results containing URLs to scrape. If provided, sets input type to upstream_urls
parameter_configDictNoMap upstream result fields to strategy parameters
parametersDictNoStatic API parameter overrides
filterDictNoFilter condition (use Filter class to build)
Returns: WorkflowNode — the new downstream node (for further chaining)
If url_field is provided, the node uses upstream_urls input type (extracts URLs from upstream results). If neither url_field nor parameter_config is provided, it uses upstream_data input type (passes full upstream results as context).
# Extract URLs from upstream results
details = index.then("details", detail_strategy_id, url_field="job_url")

# With filtering
tech_jobs = index.then(
    "tech_jobs",
    detail_strategy_id,
    url_field="job_url",
    filter=Filter.contains("category", "engineering")
)

# Chain further
sub_details = details.then("sub_details", sub_strategy_id, url_field="company_url")

Filter class

from meter_sdk.workflow import Filter
Build filter conditions for workflow edges. All string operators accept an optional case_sensitive parameter (default: False).

String operators

Filter.contains(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.not_contains(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.equals(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.not_equals(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.regex_match(field: str, pattern: str, case_sensitive: bool = False) -> dict

Existence operators

Filter.exists(field: str) -> dict
Filter.not_exists(field: str) -> dict

Comparison operators

Filter.gt(field: str, value: str) -> dict
Filter.lt(field: str, value: str) -> dict

Logical combinators

Filter.all(*conditions) -> dict  # AND — all conditions must match
Filter.any(*conditions) -> dict  # OR — at least one must match

Examples

from meter_sdk.workflow import Filter

# Single condition
f = Filter.contains("url", "/products/")

# AND: all conditions must match
f = Filter.all(
    Filter.contains("category", "electronics"),
    Filter.gt("price", "50"),
    Filter.exists("in_stock")
)

# OR: at least one must match
f = Filter.any(
    Filter.equals("status", "sale"),
    Filter.equals("status", "clearance")
)

# Case-sensitive match
f = Filter.equals("sku", "ABC-123", case_sensitive=True)

# Pass to then()
details = index.then("details", strategy_id, url_field="link", filter=f)

Client methods

create_workflow()

Create a workflow from a Workflow object.
client.create_workflow(workflow: Workflow) -> Dict
ParameterTypeRequiredDescription
workflowWorkflowYesWorkflow object defining the DAG structure
Returns: Created workflow details including id
workflow = Workflow("My Workflow")
index = workflow.start("index", strategy_id, urls=["https://example.com"])
details = index.then("details", detail_strategy_id, url_field="link")

created = client.create_workflow(workflow)
print(f"Workflow ID: {created['id']}")

run_workflow()

Run a workflow. Accepts either a Workflow object (creates then runs) or a workflow ID string.
client.run_workflow(
    workflow_or_id: Union[Workflow, str],
    force: bool = False,
    wait: bool = True,
    timeout: float = 3600
) -> Dict
ParameterTypeRequiredDescription
workflow_or_idWorkflow or strYesWorkflow object (creates then runs) or workflow ID
forceboolNoForce re-run, skipping change detection (default: False)
waitboolNoBlock until completion (default: True)
timeoutfloatNoMaximum seconds to wait (default: 3600)
Returns: Completed run details including status and node_executions (if wait=True) or run details with status (if wait=False). Use get_workflow_output() to fetch results. Raises: MeterError if run fails or times out
# Create and run in one step
workflow = Workflow("My Scraper")
index = workflow.start("index", strategy_id, urls=["https://example.com"])
result = client.run_workflow(workflow)

# Run an existing workflow by ID
result = client.run_workflow("workflow-uuid")

# Fire and forget (don't wait)
run = client.run_workflow("workflow-uuid", wait=False)
print(f"Run started: {run['id']}")

# Force re-run (skip change detection)
result = client.run_workflow("workflow-uuid", force=True)

wait_for_workflow()

Poll a workflow run until it completes.
client.wait_for_workflow(
    workflow_id: str,
    run_id: str,
    poll_interval: float = 5.0,
    timeout: float = 3600
) -> Dict
ParameterTypeRequiredDescription
workflow_idstrYesWorkflow UUID
run_idstrYesRun UUID
poll_intervalfloatNoSeconds between polls (default: 5.0)
timeoutfloatNoMaximum seconds to wait (default: 3600)
Returns: Completed run with results Raises: MeterError if run fails or times out
# Start a run without waiting
run = client.run_workflow("workflow-uuid", wait=False)

# Wait for it later
completed = client.wait_for_workflow("workflow-uuid", run["id"], timeout=600)

get_workflow()

Get workflow details including nodes and edges.
client.get_workflow(workflow_id: str) -> Dict

list_workflows()

List all workflows.
client.list_workflows(
    limit: int = 50,
    offset: int = 0
) -> List[Dict]

delete_workflow()

Delete a workflow and all associated runs and schedules.
client.delete_workflow(workflow_id: str) -> Dict
This deletes the workflow, all run history, and any associated schedules.

get_workflow_run()

Get details of a specific workflow run, including node execution results.
client.get_workflow_run(workflow_id: str, run_id: str) -> Dict
Returns: Run details including status and node_executions

list_workflow_runs()

List run history for a workflow.
client.list_workflow_runs(
    workflow_id: str,
    limit: int = 20,
    offset: int = 0
) -> List[Dict]

get_workflow_output()

Get the latest completed run’s results.
client.get_workflow_output(
    workflow_id: str,
    flat: bool = False
) -> Dict
ParameterTypeRequiredDescription
workflow_idstrYesWorkflow UUID
flatboolNoReturn flat per-URL results instead of grouped by strategy (default: False)
Returns: Latest run output. Default format uses final_results_by_url_grouped; with flat=True uses final_results_by_url. Also includes status, changed_since_previous, run_id, workflow_id, completed_at. Raises: MeterError if no completed runs exist
# Grouped by URL and strategy (default)
output = client.get_workflow_output("workflow-uuid")
for url, strategies in output["final_results_by_url_grouped"].items():
    for strategy, items in strategies.items():
        print(f"{strategy}: {len(items)} items")

# Flat per-URL results
output = client.get_workflow_output("workflow-uuid", flat=True)
for url, items in output["final_results_by_url"].items():
    print(f"{url}: {len(items)} items")

Scheduling methods

schedule_workflow()

Schedule a workflow to run on an interval or cron expression.
client.schedule_workflow(
    workflow_id: str,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None
) -> Dict
ParameterTypeRequiredDescription
workflow_idstrYesWorkflow UUID
interval_secondsintConditionalRun every N seconds (or use cron_expression)
cron_expressionstrConditionalCron expression (e.g., "0 9 * * *")
webhook_urlstrNoWebhook URL to receive results
webhook_metadataDictNoCustom JSON metadata included in every webhook payload
webhook_secretstrNoSecret for X-Webhook-Secret header
webhook_typestrNo'standard' or 'slack' (default: 'standard')
# Run every hour
client.schedule_workflow(workflow_id, interval_seconds=3600)

# Daily at 9 AM with webhook
client.schedule_workflow(
    workflow_id,
    cron_expression="0 9 * * *",
    webhook_url="https://your-app.com/webhook",
    webhook_metadata={"project": "my-project"}
)

list_workflow_schedules()

client.list_workflow_schedules(workflow_id: str) -> List[Dict]

update_workflow_schedule()

client.update_workflow_schedule(
    workflow_id: str,
    schedule_id: str,
    enabled: Optional[bool] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None
) -> Dict

delete_workflow_schedule()

client.delete_workflow_schedule(workflow_id: str, schedule_id: str) -> Dict

Complete example

from meter_sdk import MeterClient
from meter_sdk.workflow import Workflow, Filter

client = MeterClient(api_key="sk_live_...")

# Step 1: Build the workflow
workflow = Workflow("E-commerce Scraper", description="Scrape product index then detail pages")

# Root node: scrape the product listing
index = workflow.start(
    "product_index",
    index_strategy_id,
    urls=["https://shop.com/products"]
)

# Downstream: only follow electronics links
electronics = index.then(
    "electronics",
    detail_strategy_id,
    url_field="product_url",
    filter=Filter.contains("category", "electronics")
)

# Step 2: Run the workflow
run = client.run_workflow(workflow)

# Step 3: Get results
output = client.get_workflow_output(run["workflow_id"])
for url, strategies in output["final_results_by_url_grouped"].items():
    print(f"\n{url}:")
    for strategy, items in strategies.items():
        print(f"  {strategy} ({len(items)} items):")
        for item in items[:3]:
            print(f"    {item}")

# Step 4: Schedule for daily runs
client.schedule_workflow(
    run["workflow_id"],
    cron_expression="0 9 * * *",
    webhook_url="https://your-app.com/webhook"
)

See also

Workflows Concept

Understand workflow architecture and patterns

REST API

Workflow endpoints in the REST API

MeterClient Reference

Full client method documentation

Strategies

Learn about the strategies workflows use

Need help?

Email me at mckinnon@meter.sh