Workflow Methods
Workflow methods allow you to build multi-step scraping pipelines where the output of one scraper feeds into the next. See the Workflows concept page for an introduction.
Quick reference
Workflow classes
| Class | Description |
|---|
Workflow | Define a DAG pipeline with nodes and edges |
WorkflowNode | A single scraping step in the workflow |
Filter | Build filter conditions for edges |
Client methods
| Method | Description |
|---|
create_workflow() | Create a workflow from a Workflow object |
run_workflow() | Run a workflow (create + run, or run existing) |
wait_for_workflow() | Poll a workflow run until completion |
get_workflow() | Get workflow details |
list_workflows() | List all workflows |
delete_workflow() | Delete a workflow |
get_workflow_run() | Get details of a specific run |
list_workflow_runs() | List run history |
get_workflow_output() | Get latest run’s results |
schedule_workflow() | Schedule a workflow on interval or cron |
list_workflow_schedules() | List schedules for a workflow |
update_workflow_schedule() | Update a workflow schedule |
delete_workflow_schedule() | Delete a workflow schedule |
Workflow class
from meter_sdk.workflow import Workflow
Constructor
Workflow(name: str, description: Optional[str] = None)
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Workflow name |
description | str | No | Description of what the workflow does |
start()
Create the starting node with static URLs.
workflow.start(
name: str,
strategy_id: str,
urls: List[str],
parameters: Optional[Dict[str, Any]] = None
) -> WorkflowNode
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Unique node identifier within the workflow |
strategy_id | str | Yes | Strategy UUID to use for scraping |
urls | List[str] | Yes | List of URLs to scrape |
parameters | Dict | No | Static API parameter overrides |
Returns: WorkflowNode — chain with .then() for downstream nodes
workflow = Workflow("Job Scraper")
index = workflow.start("index", strategy_id, urls=["https://jobs.com"])
to_dict()
Serialize the workflow for API requests. You typically don’t need to call this directly — create_workflow() and run_workflow() handle serialization.
workflow.to_dict() -> Dict[str, Any]
WorkflowNode class
then()
Chain a downstream node that receives data from this node.
node.then(
name: str,
strategy_id: str,
url_field: Optional[str] = None,
parameter_config: Optional[Dict[str, Any]] = None,
parameters: Optional[Dict[str, Any]] = None,
filter: Optional[Dict[str, Any]] = None
) -> WorkflowNode
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Unique node identifier |
strategy_id | str | Yes | Strategy UUID for the downstream scraper |
url_field | str | No | Field name in upstream results containing URLs to scrape. If provided, sets input type to upstream_urls |
parameter_config | Dict | No | Map upstream result fields to strategy parameters |
parameters | Dict | No | Static API parameter overrides |
filter | Dict | No | Filter condition (use Filter class to build) |
Returns: WorkflowNode — the new downstream node (for further chaining)
If url_field is provided, the node uses upstream_urls input type (extracts URLs from upstream results). If neither url_field nor parameter_config is provided, it uses upstream_data input type (passes full upstream results as context).
# Extract URLs from upstream results
details = index.then("details", detail_strategy_id, url_field="job_url")
# With filtering
tech_jobs = index.then(
"tech_jobs",
detail_strategy_id,
url_field="job_url",
filter=Filter.contains("category", "engineering")
)
# Chain further
sub_details = details.then("sub_details", sub_strategy_id, url_field="company_url")
Filter class
from meter_sdk.workflow import Filter
Build filter conditions for workflow edges. All string operators accept an optional case_sensitive parameter (default: False).
String operators
Filter.contains(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.not_contains(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.equals(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.not_equals(field: str, value: str, case_sensitive: bool = False) -> dict
Filter.regex_match(field: str, pattern: str, case_sensitive: bool = False) -> dict
Existence operators
Filter.exists(field: str) -> dict
Filter.not_exists(field: str) -> dict
Comparison operators
Filter.gt(field: str, value: str) -> dict
Filter.lt(field: str, value: str) -> dict
Logical combinators
Filter.all(*conditions) -> dict # AND — all conditions must match
Filter.any(*conditions) -> dict # OR — at least one must match
Examples
from meter_sdk.workflow import Filter
# Single condition
f = Filter.contains("url", "/products/")
# AND: all conditions must match
f = Filter.all(
Filter.contains("category", "electronics"),
Filter.gt("price", "50"),
Filter.exists("in_stock")
)
# OR: at least one must match
f = Filter.any(
Filter.equals("status", "sale"),
Filter.equals("status", "clearance")
)
# Case-sensitive match
f = Filter.equals("sku", "ABC-123", case_sensitive=True)
# Pass to then()
details = index.then("details", strategy_id, url_field="link", filter=f)
Client methods
create_workflow()
Create a workflow from a Workflow object.
client.create_workflow(workflow: Workflow) -> Dict
| Parameter | Type | Required | Description |
|---|
workflow | Workflow | Yes | Workflow object defining the DAG structure |
Returns: Created workflow details including id
workflow = Workflow("My Workflow")
index = workflow.start("index", strategy_id, urls=["https://example.com"])
details = index.then("details", detail_strategy_id, url_field="link")
created = client.create_workflow(workflow)
print(f"Workflow ID: {created['id']}")
run_workflow()
Run a workflow. Accepts either a Workflow object (creates then runs) or a workflow ID string.
client.run_workflow(
workflow_or_id: Union[Workflow, str],
force: bool = False,
wait: bool = True,
timeout: float = 3600
) -> Dict
| Parameter | Type | Required | Description |
|---|
workflow_or_id | Workflow or str | Yes | Workflow object (creates then runs) or workflow ID |
force | bool | No | Force re-run, skipping change detection (default: False) |
wait | bool | No | Block until completion (default: True) |
timeout | float | No | Maximum seconds to wait (default: 3600) |
Returns: Completed run details including status and node_executions (if wait=True) or run details with status (if wait=False). Use get_workflow_output() to fetch results.
Raises: MeterError if run fails or times out
# Create and run in one step
workflow = Workflow("My Scraper")
index = workflow.start("index", strategy_id, urls=["https://example.com"])
result = client.run_workflow(workflow)
# Run an existing workflow by ID
result = client.run_workflow("workflow-uuid")
# Fire and forget (don't wait)
run = client.run_workflow("workflow-uuid", wait=False)
print(f"Run started: {run['id']}")
# Force re-run (skip change detection)
result = client.run_workflow("workflow-uuid", force=True)
wait_for_workflow()
Poll a workflow run until it completes.
client.wait_for_workflow(
workflow_id: str,
run_id: str,
poll_interval: float = 5.0,
timeout: float = 3600
) -> Dict
| Parameter | Type | Required | Description |
|---|
workflow_id | str | Yes | Workflow UUID |
run_id | str | Yes | Run UUID |
poll_interval | float | No | Seconds between polls (default: 5.0) |
timeout | float | No | Maximum seconds to wait (default: 3600) |
Returns: Completed run with results
Raises: MeterError if run fails or times out
# Start a run without waiting
run = client.run_workflow("workflow-uuid", wait=False)
# Wait for it later
completed = client.wait_for_workflow("workflow-uuid", run["id"], timeout=600)
get_workflow()
Get workflow details including nodes and edges.
client.get_workflow(workflow_id: str) -> Dict
list_workflows()
List all workflows.
client.list_workflows(
limit: int = 50,
offset: int = 0
) -> List[Dict]
delete_workflow()
Delete a workflow and all associated runs and schedules.
client.delete_workflow(workflow_id: str) -> Dict
This deletes the workflow, all run history, and any associated schedules.
get_workflow_run()
Get details of a specific workflow run, including node execution results.
client.get_workflow_run(workflow_id: str, run_id: str) -> Dict
Returns: Run details including status and node_executions
list_workflow_runs()
List run history for a workflow.
client.list_workflow_runs(
workflow_id: str,
limit: int = 20,
offset: int = 0
) -> List[Dict]
get_workflow_output()
Get the latest completed run’s results.
client.get_workflow_output(
workflow_id: str,
flat: bool = False
) -> Dict
| Parameter | Type | Required | Description |
|---|
workflow_id | str | Yes | Workflow UUID |
flat | bool | No | Return flat per-URL results instead of grouped by strategy (default: False) |
Returns: Latest run output. Default format uses final_results_by_url_grouped; with flat=True uses final_results_by_url. Also includes status, changed_since_previous, run_id, workflow_id, completed_at.
Raises: MeterError if no completed runs exist
# Grouped by URL and strategy (default)
output = client.get_workflow_output("workflow-uuid")
for url, strategies in output["final_results_by_url_grouped"].items():
for strategy, items in strategies.items():
print(f"{strategy}: {len(items)} items")
# Flat per-URL results
output = client.get_workflow_output("workflow-uuid", flat=True)
for url, items in output["final_results_by_url"].items():
print(f"{url}: {len(items)} items")
Scheduling methods
schedule_workflow()
Schedule a workflow to run on an interval or cron expression.
client.schedule_workflow(
workflow_id: str,
interval_seconds: Optional[int] = None,
cron_expression: Optional[str] = None,
webhook_url: Optional[str] = None,
webhook_metadata: Optional[Dict[str, Any]] = None,
webhook_secret: Optional[str] = None,
webhook_type: Optional[str] = None
) -> Dict
| Parameter | Type | Required | Description |
|---|
workflow_id | str | Yes | Workflow UUID |
interval_seconds | int | Conditional | Run every N seconds (or use cron_expression) |
cron_expression | str | Conditional | Cron expression (e.g., "0 9 * * *") |
webhook_url | str | No | Webhook URL to receive results |
webhook_metadata | Dict | No | Custom JSON metadata included in every webhook payload |
webhook_secret | str | No | Secret for X-Webhook-Secret header |
webhook_type | str | No | 'standard' or 'slack' (default: 'standard') |
# Run every hour
client.schedule_workflow(workflow_id, interval_seconds=3600)
# Daily at 9 AM with webhook
client.schedule_workflow(
workflow_id,
cron_expression="0 9 * * *",
webhook_url="https://your-app.com/webhook",
webhook_metadata={"project": "my-project"}
)
list_workflow_schedules()
client.list_workflow_schedules(workflow_id: str) -> List[Dict]
update_workflow_schedule()
client.update_workflow_schedule(
workflow_id: str,
schedule_id: str,
enabled: Optional[bool] = None,
interval_seconds: Optional[int] = None,
cron_expression: Optional[str] = None,
webhook_url: Optional[str] = None,
webhook_metadata: Optional[Dict[str, Any]] = None,
webhook_secret: Optional[str] = None,
webhook_type: Optional[str] = None
) -> Dict
delete_workflow_schedule()
client.delete_workflow_schedule(workflow_id: str, schedule_id: str) -> Dict
Complete example
from meter_sdk import MeterClient
from meter_sdk.workflow import Workflow, Filter
client = MeterClient(api_key="sk_live_...")
# Step 1: Build the workflow
workflow = Workflow("E-commerce Scraper", description="Scrape product index then detail pages")
# Root node: scrape the product listing
index = workflow.start(
"product_index",
index_strategy_id,
urls=["https://shop.com/products"]
)
# Downstream: only follow electronics links
electronics = index.then(
"electronics",
detail_strategy_id,
url_field="product_url",
filter=Filter.contains("category", "electronics")
)
# Step 2: Run the workflow
run = client.run_workflow(workflow)
# Step 3: Get results
output = client.get_workflow_output(run["workflow_id"])
for url, strategies in output["final_results_by_url_grouped"].items():
print(f"\n{url}:")
for strategy, items in strategies.items():
print(f" {strategy} ({len(items)} items):")
for item in items[:3]:
print(f" {item}")
# Step 4: Schedule for daily runs
client.schedule_workflow(
run["workflow_id"],
cron_expression="0 9 * * *",
webhook_url="https://your-app.com/webhook"
)
See also
Need help?
Email me at mckinnon@meter.sh