Skip to main content

Workflow Methods

Workflow methods allow you to build multi-step scraping pipelines where the output of one scraper feeds into the next. See the Workflows concept page for an introduction.

Quick reference

Workflow classes

ClassDescription
WorkflowDefine a DAG pipeline with nodes and edges
WorkflowNodeA single scraping step in the workflow
FilterBuild filter conditions for edges

Client methods

MethodDescription
create_workflow()Create a workflow from a Workflow object
run_workflow()Run a workflow (create + run, or run existing)
wait_for_workflow()Poll a workflow run until completion
get_workflow()Get workflow details
list_workflows()List all workflows
delete_workflow()Delete a workflow
get_workflow_run()Get details of a specific run
list_workflow_runs()List run history
get_workflow_output()Get latest run’s results
schedule_workflow()Schedule a workflow on interval or cron
list_workflow_schedules()List schedules for a workflow
update_workflow_schedule()Update a workflow schedule
delete_workflow_schedule()Delete a workflow schedule

Workflow class

from meter_sdk.workflow import Workflow

Constructor

Workflow(name: str, description: Optional[str] = None)
ParameterTypeRequiredDescription
namestrYesWorkflow name
descriptionstrNoDescription of what the workflow does

start()

Create the starting node with static URLs.
workflow.start(
    name: str,
    strategy_id: str,
    urls: List[str],
    parameters: Optional[Dict[str, Any]] = None
) -> WorkflowNode
ParameterTypeRequiredDescription
namestrYesUnique node identifier within the workflow
strategy_idstrYesStrategy UUID to use for scraping
urlsList[str]YesList of URLs to scrape
parametersDictNoStatic API parameter overrides
Returns: WorkflowNode — chain with .then() for downstream nodes
workflow = Workflow("Job Scraper")
index = workflow.start("index", strategy_id, urls=["https://jobs.com"])

to_dict()

Serialize the workflow for API requests. You typically don’t need to call this directly — create_workflow() and run_workflow() handle serialization.
workflow.to_dict() -> Dict[str, Any]

WorkflowNode class

then()

Chain a downstream node that receives data from this node.
node.then(
    name: str,
    strategy_id: str,
    url_field: Optional[str] = None,
    parameter_config: Optional[Dict[str, Any]] = None,
    parameters: Optional[Dict[str, Any]] = None,
    filter: Optional[Dict[str, Any]] = None
) -> WorkflowNode
ParameterTypeRequiredDescription
namestrYesUnique node identifier
strategy_idstrYesStrategy UUID for the downstream scraper
url_fieldstrNoField name in upstream results containing URLs to scrape. If provided, sets input type to upstream_urls
parameter_configDictNoMap upstream result fields to strategy parameters
parametersDictNoStatic API parameter overrides
filterDictNoFilter condition (use Filter class to build)
Returns: WorkflowNode — the new downstream node (for further chaining)
If url_field is provided, the node uses upstream_urls input type (extracts URLs from upstream results). If neither url_field nor parameter_config is provided, it uses upstream_data input type (passes full upstream results as context).
# Extract URLs from upstream results
details = index.then("details", detail_strategy_id, url_field="job_url")

# With filtering
tech_jobs = index.then(
    "tech_jobs",
    detail_strategy_id,
    url_field="job_url",
    filter=Filter.contains("category", "engineering")
)

# Chain further
sub_details = details.then("sub_details", sub_strategy_id, url_field="company_url")

Filter class

from meter_sdk.workflow import Filter
Build filter conditions for workflow edges. All string operators accept an optional case_sensitive parameter (default: False).

String operators

Filter.contains(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.not_contains(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.equals(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.not_equals(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.regex_match(field: str, pattern: str, case_sensitive: bool = False) -> dict

Existence operators

Filter.exists(field: str) -> dict
Filter.not_exists(field: str) -> dict

Comparison operators

Filter.gt(field: str, value: str) -> dict
Filter.lt(field: str, value: str) -> dict

Logical combinators

Filter.all(*conditions) -> dict  # AND — all conditions must match
Filter.any(*conditions) -> dict  # OR — at least one must match

Examples

from meter_sdk.workflow import Filter

# Single condition
f = Filter.contains("url", "/products/")

# AND: all conditions must match
f = Filter.all(
    Filter.contains("category", "electronics"),
    Filter.gt("price", "50"),
    Filter.exists("in_stock")
)

# OR: at least one must match
f = Filter.any(
    Filter.equals("status", "sale"),
    Filter.equals("status", "clearance")
)

# Case-sensitive match
f = Filter.equals("sku", "ABC-123", case_sensitive=True)

# Pass to then()
details = index.then("details", strategy_id, url_field="link", filter=f)

Client methods

create_workflow()

Create a workflow from a Workflow object.
client.create_workflow(workflow: Workflow) -> Dict
ParameterTypeRequiredDescription
workflowWorkflowYesWorkflow object defining the DAG structure
Returns: Created workflow details including id
workflow = Workflow("My Workflow")
index = workflow.start("index", strategy_id, urls=["https://example.com"])
details = index.then("details", detail_strategy_id, url_field="link")

created = client.create_workflow(workflow)
print(f"Workflow ID: {created['id']}")

run_workflow()

Run a workflow. Accepts either a Workflow object (creates then runs) or a workflow ID string.
client.run_workflow(
    workflow_or_id: Union[Workflow, str],
    force: bool = False,
    wait: bool = True,
    timeout: float = 3600
) -> Dict
ParameterTypeRequiredDescription
workflow_or_idWorkflow or strYesWorkflow object (creates then runs) or workflow ID
forceboolNoForce re-run, skipping change detection (default: False)
waitboolNoBlock until completion (default: True)
timeoutfloatNoMaximum seconds to wait (default: 3600)
Returns: Completed run details including status and node_executions (if wait=True) or run details with status (if wait=False). Use get_workflow_output() to fetch results. Raises: MeterError if run fails or times out
# Create and run in one step
workflow = Workflow("My Scraper")
index = workflow.start("index", strategy_id, urls=["https://example.com"])
result = client.run_workflow(workflow)

# Run an existing workflow by ID
result = client.run_workflow("workflow-uuid")

# Fire and forget (don't wait)
run = client.run_workflow("workflow-uuid", wait=False)
print(f"Run started: {run['id']}")

# Force re-run (skip change detection)
result = client.run_workflow("workflow-uuid", force=True)

wait_for_workflow()

Poll a workflow run until it completes.
client.wait_for_workflow(
    workflow_id: str,
    run_id: str,
    poll_interval: float = 5.0,
    timeout: float = 3600
) -> Dict
ParameterTypeRequiredDescription
workflow_idstrYesWorkflow UUID
run_idstrYesRun UUID
poll_intervalfloatNoSeconds between polls (default: 5.0)
timeoutfloatNoMaximum seconds to wait (default: 3600)
Returns: Completed run with results Raises: MeterError if run fails or times out
# Start a run without waiting
run = client.run_workflow("workflow-uuid", wait=False)

# Wait for it later
completed = client.wait_for_workflow("workflow-uuid", run["id"], timeout=600)

get_workflow()

Get workflow details including nodes and edges.
client.get_workflow(workflow_id: str) -> Dict

list_workflows()

List all workflows.
client.list_workflows(
    limit: int = 50,
    offset: int = 0
) -> List[Dict]

delete_workflow()

Delete a workflow and all associated runs and schedules.
client.delete_workflow(workflow_id: str) -> Dict
This deletes the workflow, all run history, and any associated schedules.

get_workflow_run()

Get details of a specific workflow run, including node execution results.
client.get_workflow_run(workflow_id: str, run_id: str) -> Dict
Returns: Run details including status and node_executions

list_workflow_runs()

List run history for a workflow.
client.list_workflow_runs(
    workflow_id: str,
    limit: int = 20,
    offset: int = 0
) -> List[Dict]

get_workflow_output()

Get the latest completed run’s results.
client.get_workflow_output(
    workflow_id: str,
    flat: bool = False
) -> Dict
ParameterTypeRequiredDescription
workflow_idstrYesWorkflow UUID
flatboolNoReturn flat per-URL results instead of grouped by strategy (default: False)
Returns: Latest run output. Default format uses final_results_by_url_grouped; with flat=True uses final_results_by_url. Also includes status, changed_since_previous, run_id, workflow_id, completed_at. Raises: MeterError if no completed runs exist
# Grouped by URL and strategy (default)
output = client.get_workflow_output("workflow-uuid")
for url, strategies in output["final_results_by_url_grouped"].items():
    for strategy, items in strategies.items():
        print(f"{strategy}: {len(items)} items")

# Flat per-URL results
output = client.get_workflow_output("workflow-uuid", flat=True)
for url, items in output["final_results_by_url"].items():
    print(f"{url}: {len(items)} items")

Scheduling methods

schedule_workflow()

Schedule a workflow to run on an interval or cron expression.
client.schedule_workflow(
    workflow_id: str,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None
) -> Dict
ParameterTypeRequiredDescription
workflow_idstrYesWorkflow UUID
interval_secondsintConditionalRun every N seconds (or use cron_expression)
cron_expressionstrConditionalCron expression (e.g., "0 9 * * *")
webhook_urlstrNoWebhook URL to receive results
webhook_metadataDictNoCustom JSON metadata included in every webhook payload
webhook_secretstrNoSecret for X-Webhook-Secret header
webhook_typestrNo'standard' or 'slack' (default: 'standard')
# Run every hour
client.schedule_workflow(workflow_id, interval_seconds=3600)

# Daily at 9 AM with webhook
client.schedule_workflow(
    workflow_id,
    cron_expression="0 9 * * *",
    webhook_url="https://your-app.com/webhook",
    webhook_metadata={"project": "my-project"}
)

list_workflow_schedules()

client.list_workflow_schedules(workflow_id: str) -> List[Dict]

update_workflow_schedule()

client.update_workflow_schedule(
    workflow_id: str,
    schedule_id: str,
    enabled: Optional[bool] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None
) -> Dict

delete_workflow_schedule()

client.delete_workflow_schedule(workflow_id: str, schedule_id: str) -> Dict

Complete example

from meter_sdk import MeterClient
from meter_sdk.workflow import Workflow, Filter

client = MeterClient(api_key="sk_live_...")

# Step 1: Build the workflow
workflow = Workflow("E-commerce Scraper", description="Scrape product index then detail pages")

# Root node: scrape the product listing
index = workflow.start(
    "product_index",
    index_strategy_id,
    urls=["https://shop.com/products"]
)

# Downstream: only follow electronics links
electronics = index.then(
    "electronics",
    detail_strategy_id,
    url_field="product_url",
    filter=Filter.contains("category", "electronics")
)

# Step 2: Run the workflow
run = client.run_workflow(workflow)

# Step 3: Get results
output = client.get_workflow_output(run["workflow_id"])
for url, strategies in output["final_results_by_url_grouped"].items():
    print(f"\n{url}:")
    for strategy, items in strategies.items():
        print(f"  {strategy} ({len(items)} items):")
        for item in items[:3]:
            print(f"    {item}")

# Step 4: Schedule for daily runs
client.schedule_workflow(
    run["workflow_id"],
    cron_expression="0 9 * * *",
    webhook_url="https://your-app.com/webhook"
)

See also

Need help?

Email me at mckinnon@meter.sh