MeterClient
The MeterClient class is the main interface for all Meter API operations. It handles authentication, request management, and provides methods for strategies, jobs, and schedules.
Constructor
MeterClient(api_key: str, base_url: str = "https://api.meter.sh")
Parameters
| Parameter | Type | Required | Description |
|---|
api_key | str | Yes | Your Meter API key (starts with sk_live_) |
base_url | str | No | API base URL (default: https://api.meter.sh) |
Example
from meter_sdk import MeterClient
import os
# Recommended: Load from environment
client = MeterClient(api_key=os.getenv("METER_API_KEY"))
# With custom base URL (for development)
client = MeterClient(
api_key=os.getenv("METER_API_KEY"),
base_url="http://localhost:8000"
)
Context Manager
The client can be used as a context manager for automatic resource cleanup:
with MeterClient(api_key="sk_live_...") as client:
strategies = client.list_strategies()
# Client automatically closes HTTP connections on exit
Strategy Methods
generate_strategy()
Generate a new extraction strategy using AI.
generate_strategy(
url: str,
description: str,
name: str,
force_api: bool = False
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
url | str | Yes | Target webpage URL to analyze |
description | str | Yes | Plain English description of what to extract |
name | str | Yes | Human-readable name for this strategy |
force_api | bool | No | Force API-based capture instead of CSS extraction (default: False) |
Returns: Dict with fields:
strategy_id (str): UUID of the created strategy
strategy (dict): The extraction strategy (CSS selectors, fields)
preview_data (list): Sample extracted data (first 5-10 items)
attempts (int): Number of generation attempts (usually 1)
scraper_type (str): Type of scraper used - 'css' or 'api'
api_parameters (dict, optional): Available URL parameters for API-based strategies
Example:
result = client.generate_strategy(
url="https://news.ycombinator.com",
description="Extract post titles and scores",
name="HN Front Page"
)
strategy_id = result["strategy_id"]
print(f"Created strategy: {strategy_id}")
print(f"Preview: {result['preview_data'][:3]}")
Example with API capture:
# Force API-based capture for sites with underlying APIs
result = client.generate_strategy(
url="https://api-heavy-site.com/products",
description="Extract product listings",
name="Product API Scraper",
force_api=True
)
print(f"Scraper type: {result['scraper_type']}") # 'api' or 'css'
# For API strategies, check available parameters
if result.get('api_parameters'):
print(f"Available parameters: {result['api_parameters']}")
# e.g., {'page': 1, 'limit': 20, 'sort': 'price'}
Raises: MeterError if generation fails
When force_api=True, Meter will attempt to identify and capture underlying API calls
instead of using CSS selectors. This is useful for sites that load data dynamically
via JavaScript APIs.
refine_strategy()
Refine an existing strategy with feedback.
refine_strategy(
strategy_id: str,
feedback: str
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to refine |
feedback | str | Yes | Description of what to improve or add |
Returns: Dict with same fields as generate_strategy()
Example:
refined = client.refine_strategy(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
feedback="Also extract the product images and SKU"
)
print(f"Refined preview: {refined['preview_data']}")
Refinement uses cached HTML from initial generation, so it’s fast and doesn’t re-fetch the page.
list_strategies()
List all strategies for the authenticated user.
list_strategies(
limit: int = 20,
offset: int = 0
) -> List[Dict]
Parameters:
| Parameter | Type | Required | Description |
|---|
limit | int | No | Maximum number of strategies to return (default: 20) |
offset | int | No | Number of strategies to skip (default: 0) |
Returns: List[Dict] where each dict contains:
id (str): Strategy UUID
name (str): Strategy name
description (str): Extraction description
url (str): Original URL used for generation
preview_data (list): Sample extracted data
created_at (str): ISO timestamp
updated_at (str): ISO timestamp
Example:
# Get first 20 strategies
strategies = client.list_strategies()
for strategy in strategies:
print(f"{strategy['name']}: {strategy['strategy_id']}")
# Pagination
page_2 = client.list_strategies(limit=20, offset=20)
get_strategy()
Get details for a specific strategy.
get_strategy(strategy_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy |
Returns: Dict with full strategy details (same fields as list_strategies() items)
Example:
strategy = client.get_strategy("550e8400-e29b-41d4-a716-446655440000")
print(f"Name: {strategy['name']}")
print(f"Created: {strategy['created_at']}")
print(f"Preview: {strategy['preview_data']}")
Raises: MeterError with 404 if strategy not found
delete_strategy()
Delete a strategy and all associated jobs and schedules.
delete_strategy(strategy_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to delete |
Returns: Dict with confirmation message
Example:
result = client.delete_strategy("550e8400-e29b-41d4-a716-446655440000")
print(result) # {'message': 'Strategy deleted successfully'}
This action is irreversible. All associated jobs and schedules will also be deleted.
Job Methods
create_job()
Create a new scrape job using a strategy.
create_job(
strategy_id: str,
url: Optional[str] = None,
urls: Optional[List[str]] = None,
parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to use |
url | str | Conditional | Single URL to scrape (use url OR urls, not both) |
urls | List[str] | Conditional | List of URLs to scrape as a batch |
parameters | Dict | No | Override API parameters for this job (API strategies only) |
Returns: Dict with fields:
job_id (str): UUID of the created job (single URL)
batch_id (str): Batch UUID for tracking progress (multiple URLs)
status (str): Job status (usually “pending”)
strategy_id (str): Strategy UUID
url (str): Target URL
parameters (dict, optional): Parameters used for this job
created_at (str): ISO timestamp
Example:
job = client.create_job(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/page"
)
print(f"Job created: {job['job_id']}")
print(f"Status: {job['status']}")
Example with API parameters:
# For API-based strategies, override parameters at runtime
job = client.create_job(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/api/products",
parameters={
"page": 2,
"limit": 50,
"category": "electronics"
}
)
Example with batch URLs:
# Scrape multiple URLs in a single batch
job = client.create_job(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
urls=[
"https://example.com/products/1",
"https://example.com/products/2",
"https://example.com/products/3"
]
)
print(f"Batch created: {job['batch_id']}")
You must provide either url or urls, but not both. The parameters option
only applies to API-based strategies (where scraper_type is 'api').
execute_job()
Create and execute a scrape job synchronously. Returns results directly without polling.
execute_job(
strategy_id: str,
url: str,
parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to use |
url | str | Yes | URL to scrape |
parameters | Dict | No | Override API parameters for this job (API strategies only) |
Returns: Dict with completed job details including results
Example:
# Simple synchronous scrape
result = client.execute_job(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products"
)
print(f"Got {result['item_count']} items")
for item in result['results']:
print(item)
Example with API parameters:
result = client.execute_job(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/api/products",
parameters={"page": 2, "limit": 50}
)
Raises: MeterError if job fails or times out
This endpoint blocks until the job completes (up to 1 hour timeout). Use create_job() + wait_for_job() for more control over polling behavior, or create_job() alone for fire-and-forget jobs.
get_job()
Get status and results for a job.
get_job(job_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
job_id | str | Yes | UUID of the job |
Returns: Dict with fields:
job_id (str): Job UUID
status (str): “pending”, “running”, “completed”, or “failed”
results (list): Extracted data (only if status is “completed”)
item_count (int): Number of items extracted
content_hash (str): Hash for change detection
structural_signature (dict): Structural fingerprint
error (str): Error message (only if status is “failed”)
started_at (str): ISO timestamp
completed_at (str): ISO timestamp
created_at (str): ISO timestamp
Example:
job = client.get_job("660e8400-e29b-41d4-a716-446655440000")
if job['status'] == 'completed':
print(f"Extracted {job['item_count']} items")
for item in job['results']:
print(item)
elif job['status'] == 'failed':
print(f"Job failed: {job['error']}")
else:
print(f"Job is {job['status']}")
wait_for_job()
Wait for a job to complete, polling automatically.
wait_for_job(
job_id: str,
poll_interval: float = 1.0,
timeout: Optional[float] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
job_id | str | Yes | UUID of the job to wait for |
poll_interval | float | No | Seconds between status checks (default: 1.0) |
timeout | float | No | Maximum seconds to wait (default: None = infinite) |
Returns: Dict with completed job details (same as get_job())
Example:
from meter_sdk import MeterError
# Wait indefinitely
completed = client.wait_for_job("660e8400-e29b-41d4-a716-446655440000")
print(f"Done! {completed['item_count']} items")
# With timeout
try:
completed = client.wait_for_job(
"660e8400-e29b-41d4-a716-446655440000",
poll_interval=2.0,
timeout=300.0 # 5 minutes
)
except MeterError as e:
print(f"Timeout or error: {e}")
Raises: MeterError if timeout exceeded or job fails
list_jobs()
List jobs with optional filtering.
list_jobs(
strategy_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 20,
offset: int = 0
) -> List[Dict]
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | No | Filter by strategy UUID |
status | str | No | Filter by status: “pending”, “running”, “completed”, “failed” |
limit | int | No | Maximum jobs to return (default: 20) |
offset | int | No | Number of jobs to skip (default: 0) |
Returns: List[Dict] of job summaries
Example:
# All jobs
all_jobs = client.list_jobs(limit=50)
# Jobs for specific strategy
strategy_jobs = client.list_jobs(
strategy_id="550e8400-e29b-41d4-a716-446655440000"
)
# Only failed jobs
failed = client.list_jobs(status="failed", limit=10)
# Combined filters
recent_completed = client.list_jobs(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
status="completed",
limit=5
)
compare_jobs()
Compare two jobs to detect changes.
compare_jobs(
job_id: str,
other_job_id: str
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
job_id | str | Yes | First job UUID |
other_job_id | str | Yes | Second job UUID to compare with |
Returns: Dict with fields:
content_hash_match (bool): True if content hashes match
structural_match (bool): True if structure matches
semantic_similarity (float): Similarity score 0.0-1.0 (planned feature)
changes (list): Detected structural changes
Example:
comparison = client.compare_jobs(
"660e8400-e29b-41d4-a716-446655440000",
"770e8400-e29b-41d4-a716-446655440000"
)
print(f"Content match: {comparison['content_hash_match']}")
print(f"Structural match: {comparison['structural_match']}")
if not comparison['content_hash_match']:
print("Content has changed!")
for change in comparison.get('changes', []):
print(f" - {change}")
get_strategy_history()
Get timeline of all jobs for a strategy.
get_strategy_history(strategy_id: str) -> List[Dict]
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | Strategy UUID |
Returns: List[Dict] where each dict contains:
job_id (str): Job UUID
status (str): Job status
item_count (int): Items extracted
has_changes (bool): True if content changed vs. previous job
created_at (str): ISO timestamp
Example:
history = client.get_strategy_history("550e8400-e29b-41d4-a716-446655440000")
for entry in history:
status_icon = "✓" if entry['status'] == 'completed' else "✗"
change_icon = "📝" if entry['has_changes'] else "—"
print(f"{status_icon} {entry['created_at']}: {entry['item_count']} items {change_icon}")
Schedule Methods
create_schedule()
Create a new recurring schedule.
create_schedule(
strategy_id: str,
url: Optional[str] = None,
urls: Optional[List[str]] = None,
interval_seconds: Optional[int] = None,
cron_expression: Optional[str] = None,
webhook_url: Optional[str] = None,
webhook_metadata: Optional[Dict[str, Any]] = None,
webhook_secret: Optional[str] = None,
webhook_type: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | Strategy UUID to use |
url | str | Conditional | Single URL to scrape (use url OR urls, not both) |
urls | List[str] | Conditional | List of URLs to scrape on each run |
interval_seconds | int | Conditional | Interval in seconds (required if no cron) |
cron_expression | str | Conditional | Cron expression (required if no interval) |
webhook_url | str | No | Webhook URL for notifications |
webhook_metadata | Dict | No | Custom JSON metadata included in every webhook payload |
webhook_secret | str | No | Secret for X-Webhook-Secret header. Auto-generated if not provided when webhook_url is set |
webhook_type | str | No | 'standard' or 'slack'. Auto-detected from URL if not specified |
parameters | Dict | No | Default API parameter overrides for all scheduled runs (API strategies only) |
Returns: Dict with schedule details
Example:
# Interval-based
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products",
interval_seconds=3600 # Every hour
)
# Cron-based
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products",
cron_expression="0 9 * * *" # Daily at 9 AM
)
# With webhook
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products",
interval_seconds=3600,
webhook_url="https://your-app.com/webhooks/meter"
)
print(f"Schedule created: {schedule['schedule_id']}")
print(f"Next run: {schedule['next_run_at']}")
Example with API parameters:
# For API-based strategies, set default parameters for all runs
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/api/jobs",
interval_seconds=3600,
parameters={
"category": "engineering",
"location": "remote",
"limit": 100
}
)
Example with multiple URLs:
# Monitor multiple pages on a schedule
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
urls=[
"https://example.com/products/electronics",
"https://example.com/products/clothing",
"https://example.com/products/home"
],
interval_seconds=3600
)
You must provide either url or urls, but not both. You must also provide
either interval_seconds or cron_expression, but not both.
list_schedules()
List all schedules for the authenticated user.
list_schedules() -> List[Dict]
Returns: List[Dict] of schedules
Example:
schedules = client.list_schedules()
for schedule in schedules:
print(f"Schedule {schedule['schedule_id']}:")
print(f" Type: {schedule['schedule_type']}")
print(f" Enabled: {schedule['enabled']}")
print(f" Next run: {schedule['next_run_at']}")
update_schedule()
Update an existing schedule.
update_schedule(
schedule_id: str,
enabled: Optional[bool] = None,
url: Optional[str] = None,
urls: Optional[List[str]] = None,
interval_seconds: Optional[int] = None,
cron_expression: Optional[str] = None,
webhook_url: Optional[str] = None,
webhook_metadata: Optional[Dict[str, Any]] = None,
webhook_secret: Optional[str] = None,
webhook_type: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID |
enabled | bool | No | Enable/disable schedule |
url | str | No | Update to single URL |
urls | List[str] | No | Update to multiple URLs |
interval_seconds | int | No | New interval in seconds |
cron_expression | str | No | New cron expression |
webhook_url | str | No | New webhook URL (or None to remove) |
webhook_metadata | Dict | No | Update custom JSON metadata for webhook payloads |
webhook_secret | str | No | Update webhook secret |
webhook_type | str | No | Update webhook type: 'standard' or 'slack' |
parameters | Dict | No | Update API parameter defaults (API strategies only) |
Returns: Dict with updated schedule details
Example:
# Disable schedule
client.update_schedule(schedule_id, enabled=False)
# Change interval
client.update_schedule(schedule_id, interval_seconds=7200)
# Update webhook
client.update_schedule(
schedule_id,
webhook_url="https://new-domain.com/webhooks"
)
# Remove webhook
client.update_schedule(schedule_id, webhook_url=None)
# Update API parameters
client.update_schedule(
schedule_id,
parameters={"category": "new-category", "limit": 200}
)
Setting url will clear urls, and vice versa.
delete_schedule()
Delete a schedule (stops future jobs).
delete_schedule(schedule_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID to delete |
Returns: Dict with confirmation message
Example:
result = client.delete_schedule("880e8400-e29b-41d4-a716-446655440000")
print(result) # {'message': 'Schedule deleted successfully'}
get_schedule_changes()
Get unseen changes for a schedule (pull-based change detection).
get_schedule_changes(
schedule_id: str,
mark_seen: bool = True,
filter: Optional[str] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID |
mark_seen | bool | No | Mark returned changes as seen (default: True) |
filter | str | No | Lucene-style keyword filter for results |
Returns: Dict with fields:
schedule_id (str): Schedule UUID
changes (list): Jobs with changes (full job details)
count (int): Number of changed jobs
marked_seen (bool): Whether changes were marked as seen
Example:
# Get and mark changes as seen
changes = client.get_schedule_changes(
"880e8400-e29b-41d4-a716-446655440000",
mark_seen=True
)
if changes['count'] > 0:
print(f"Found {changes['count']} jobs with changes")
for change in changes['changes']:
print(f"Job {change['job_id']}: {change['item_count']} items")
# Process change['results']
# Preview without marking as seen
preview = client.get_schedule_changes(
"880e8400-e29b-41d4-a716-446655440000",
mark_seen=False
)
Example with keyword filtering:
# Filter for items containing both keywords (AND)
changes = client.get_schedule_changes(
schedule_id,
filter="+python +remote"
)
# Filter for items containing either keyword (OR)
changes = client.get_schedule_changes(
schedule_id,
filter="python javascript"
)
# Exclude items with a keyword
changes = client.get_schedule_changes(
schedule_id,
filter="+engineer -manager"
)
# Exact phrase matching
changes = client.get_schedule_changes(
schedule_id,
filter='"machine learning"'
)
Use mark_seen=False to preview changes without affecting state.
The filter parameter filters individual items within job results.
regenerate_webhook_secret()
Regenerate the webhook secret for a schedule. The old secret is immediately invalidated.
regenerate_webhook_secret(schedule_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID |
Returns: Dict with schedule_id and the new webhook_secret
Example:
result = client.regenerate_webhook_secret("880e8400-e29b-41d4-a716-446655440000")
new_secret = result["webhook_secret"]
print(f"New secret: {new_secret}")
# Update your webhook handler with the new secret
Raises: MeterError if schedule has no webhook URL configured
The new secret is returned only once. Store it securely and update your webhook handler before the next delivery.
Workflow Methods
For workflow methods (create_workflow, run_workflow, wait_for_workflow, etc.), see the dedicated Workflow Methods reference.
Error Handling
All methods raise MeterError on API errors. See Error Handling for details.
from meter_sdk import MeterClient, MeterError
client = MeterClient(api_key="sk_live_...")
try:
strategy = client.generate_strategy(url, description, name)
except MeterError as e:
print(f"Error: {e}")
# Handle error appropriately
Next steps
Need help?
Email me at mckinnon@meter.sh