Skip to main content

MeterClient

The MeterClient class is the main interface for all Meter API operations. It handles authentication, request management, and provides methods for strategies, jobs, and schedules.

Constructor

MeterClient(api_key: str, base_url: str = "https://api.meter.sh")

Parameters

ParameterTypeRequiredDescription
api_keystrYesYour Meter API key (starts with sk_live_)
base_urlstrNoAPI base URL (default: https://api.meter.sh)

Example

from meter_sdk import MeterClient
import os

# Recommended: Load from environment
client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# With custom base URL (for development)
client = MeterClient(
    api_key=os.getenv("METER_API_KEY"),
    base_url="http://localhost:8000"
)

Context Manager

The client can be used as a context manager for automatic resource cleanup:
with MeterClient(api_key="sk_live_...") as client:
    strategies = client.list_strategies()
    # Client automatically closes HTTP connections on exit

Strategy Methods

generate_strategy()

Generate a new extraction strategy using AI.
generate_strategy(
    url: str,
    description: str,
    name: str,
    force_api: bool = False
) -> Dict
Parameters:
ParameterTypeRequiredDescription
urlstrYesTarget webpage URL to analyze
descriptionstrYesPlain English description of what to extract
namestrYesHuman-readable name for this strategy
force_apiboolNoForce API-based capture instead of CSS extraction (default: False)
Returns: Dict with fields:
  • strategy_id (str): UUID of the created strategy
  • strategy (dict): The extraction strategy (CSS selectors, fields)
  • preview_data (list): Sample extracted data (first 5-10 items)
  • attempts (int): Number of generation attempts (usually 1)
  • scraper_type (str): Type of scraper used - 'css' or 'api'
  • api_parameters (dict, optional): Available URL parameters for API-based strategies
Example:
result = client.generate_strategy(
    url="https://news.ycombinator.com",
    description="Extract post titles and scores",
    name="HN Front Page"
)

strategy_id = result["strategy_id"]
print(f"Created strategy: {strategy_id}")
print(f"Preview: {result['preview_data'][:3]}")
Example with API capture:
# Force API-based capture for sites with underlying APIs
result = client.generate_strategy(
    url="https://api-heavy-site.com/products",
    description="Extract product listings",
    name="Product API Scraper",
    force_api=True
)

print(f"Scraper type: {result['scraper_type']}")  # 'api' or 'css'

# For API strategies, check available parameters
if result.get('api_parameters'):
    print(f"Available parameters: {result['api_parameters']}")
    # e.g., {'page': 1, 'limit': 20, 'sort': 'price'}
Raises: MeterError if generation fails
When force_api=True, Meter will attempt to identify and capture underlying API calls instead of using CSS selectors. This is useful for sites that load data dynamically via JavaScript APIs.

refine_strategy()

Refine an existing strategy with feedback.
refine_strategy(
    strategy_id: str,
    feedback: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to refine
feedbackstrYesDescription of what to improve or add
Returns: Dict with same fields as generate_strategy() Example:
refined = client.refine_strategy(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    feedback="Also extract the product images and SKU"
)

print(f"Refined preview: {refined['preview_data']}")
Refinement uses cached HTML from initial generation, so it’s fast and doesn’t re-fetch the page.

list_strategies()

List all strategies for the authenticated user.
list_strategies(
    limit: int = 20,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
limitintNoMaximum number of strategies to return (default: 20)
offsetintNoNumber of strategies to skip (default: 0)
Returns: List[Dict] where each dict contains:
  • id (str): Strategy UUID
  • name (str): Strategy name
  • description (str): Extraction description
  • url (str): Original URL used for generation
  • preview_data (list): Sample extracted data
  • created_at (str): ISO timestamp
  • updated_at (str): ISO timestamp
Example:
# Get first 20 strategies
strategies = client.list_strategies()

for strategy in strategies:
    print(f"{strategy['name']}: {strategy['strategy_id']}")

# Pagination
page_2 = client.list_strategies(limit=20, offset=20)

get_strategy()

Get details for a specific strategy.
get_strategy(strategy_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy
Returns: Dict with full strategy details (same fields as list_strategies() items) Example:
strategy = client.get_strategy("550e8400-e29b-41d4-a716-446655440000")

print(f"Name: {strategy['name']}")
print(f"Created: {strategy['created_at']}")
print(f"Preview: {strategy['preview_data']}")
Raises: MeterError with 404 if strategy not found

delete_strategy()

Delete a strategy and all associated jobs and schedules.
delete_strategy(strategy_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to delete
Returns: Dict with confirmation message Example:
result = client.delete_strategy("550e8400-e29b-41d4-a716-446655440000")
print(result)  # {'message': 'Strategy deleted successfully'}
This action is irreversible. All associated jobs and schedules will also be deleted.

Job Methods

create_job()

Create a new scrape job using a strategy.
create_job(
    strategy_id: str,
    url: Optional[str] = None,
    urls: Optional[List[str]] = None,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to use
urlstrConditionalSingle URL to scrape (use url OR urls, not both)
urlsList[str]ConditionalList of URLs to scrape as a batch
parametersDictNoOverride API parameters for this job (API strategies only)
Returns: Dict with fields:
  • job_id (str): UUID of the created job (single URL)
  • batch_id (str): Batch UUID for tracking progress (multiple URLs)
  • status (str): Job status (usually “pending”)
  • strategy_id (str): Strategy UUID
  • url (str): Target URL
  • parameters (dict, optional): Parameters used for this job
  • created_at (str): ISO timestamp
Example:
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/page"
)

print(f"Job created: {job['job_id']}")
print(f"Status: {job['status']}")
Example with API parameters:
# For API-based strategies, override parameters at runtime
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/api/products",
    parameters={
        "page": 2,
        "limit": 50,
        "category": "electronics"
    }
)
Example with batch URLs:
# Scrape multiple URLs in a single batch
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    urls=[
        "https://example.com/products/1",
        "https://example.com/products/2",
        "https://example.com/products/3"
    ]
)

print(f"Batch created: {job['batch_id']}")
You must provide either url or urls, but not both. The parameters option only applies to API-based strategies (where scraper_type is 'api').

execute_job()

Create and execute a scrape job synchronously. Returns results directly without polling.
execute_job(
    strategy_id: str,
    url: str,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to use
urlstrYesURL to scrape
parametersDictNoOverride API parameters for this job (API strategies only)
Returns: Dict with completed job details including results Example:
# Simple synchronous scrape
result = client.execute_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products"
)

print(f"Got {result['item_count']} items")
for item in result['results']:
    print(item)
Example with API parameters:
result = client.execute_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/api/products",
    parameters={"page": 2, "limit": 50}
)
Raises: MeterError if job fails or times out
This endpoint blocks until the job completes (up to 1 hour timeout). Use create_job() + wait_for_job() for more control over polling behavior, or create_job() alone for fire-and-forget jobs.

get_job()

Get status and results for a job.
get_job(job_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesUUID of the job
Returns: Dict with fields:
  • job_id (str): Job UUID
  • status (str): “pending”, “running”, “completed”, or “failed”
  • results (list): Extracted data (only if status is “completed”)
  • item_count (int): Number of items extracted
  • content_hash (str): Hash for change detection
  • structural_signature (dict): Structural fingerprint
  • error (str): Error message (only if status is “failed”)
  • started_at (str): ISO timestamp
  • completed_at (str): ISO timestamp
  • created_at (str): ISO timestamp
Example:
job = client.get_job("660e8400-e29b-41d4-a716-446655440000")

if job['status'] == 'completed':
    print(f"Extracted {job['item_count']} items")
    for item in job['results']:
        print(item)
elif job['status'] == 'failed':
    print(f"Job failed: {job['error']}")
else:
    print(f"Job is {job['status']}")

wait_for_job()

Wait for a job to complete, polling automatically.
wait_for_job(
    job_id: str,
    poll_interval: float = 1.0,
    timeout: Optional[float] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesUUID of the job to wait for
poll_intervalfloatNoSeconds between status checks (default: 1.0)
timeoutfloatNoMaximum seconds to wait (default: None = infinite)
Returns: Dict with completed job details (same as get_job()) Example:
from meter_sdk import MeterError

# Wait indefinitely
completed = client.wait_for_job("660e8400-e29b-41d4-a716-446655440000")
print(f"Done! {completed['item_count']} items")

# With timeout
try:
    completed = client.wait_for_job(
        "660e8400-e29b-41d4-a716-446655440000",
        poll_interval=2.0,
        timeout=300.0  # 5 minutes
    )
except MeterError as e:
    print(f"Timeout or error: {e}")
Raises: MeterError if timeout exceeded or job fails

list_jobs()

List jobs with optional filtering.
list_jobs(
    strategy_id: Optional[str] = None,
    status: Optional[str] = None,
    limit: int = 20,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
strategy_idstrNoFilter by strategy UUID
statusstrNoFilter by status: “pending”, “running”, “completed”, “failed”
limitintNoMaximum jobs to return (default: 20)
offsetintNoNumber of jobs to skip (default: 0)
Returns: List[Dict] of job summaries Example:
# All jobs
all_jobs = client.list_jobs(limit=50)

# Jobs for specific strategy
strategy_jobs = client.list_jobs(
    strategy_id="550e8400-e29b-41d4-a716-446655440000"
)

# Only failed jobs
failed = client.list_jobs(status="failed", limit=10)

# Combined filters
recent_completed = client.list_jobs(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    status="completed",
    limit=5
)

compare_jobs()

Compare two jobs to detect changes.
compare_jobs(
    job_id: str,
    other_job_id: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesFirst job UUID
other_job_idstrYesSecond job UUID to compare with
Returns: Dict with fields:
  • content_hash_match (bool): True if content hashes match
  • structural_match (bool): True if structure matches
  • semantic_similarity (float): Similarity score 0.0-1.0 (planned feature)
  • changes (list): Detected structural changes
Example:
comparison = client.compare_jobs(
    "660e8400-e29b-41d4-a716-446655440000",
    "770e8400-e29b-41d4-a716-446655440000"
)

print(f"Content match: {comparison['content_hash_match']}")
print(f"Structural match: {comparison['structural_match']}")

if not comparison['content_hash_match']:
    print("Content has changed!")
    for change in comparison.get('changes', []):
        print(f"  - {change}")

get_strategy_history()

Get timeline of all jobs for a strategy.
get_strategy_history(strategy_id: str) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesStrategy UUID
Returns: List[Dict] where each dict contains:
  • job_id (str): Job UUID
  • status (str): Job status
  • item_count (int): Items extracted
  • has_changes (bool): True if content changed vs. previous job
  • created_at (str): ISO timestamp
Example:
history = client.get_strategy_history("550e8400-e29b-41d4-a716-446655440000")

for entry in history:
    status_icon = "✓" if entry['status'] == 'completed' else "✗"
    change_icon = "📝" if entry['has_changes'] else "—"
    print(f"{status_icon} {entry['created_at']}: {entry['item_count']} items {change_icon}")

Schedule Methods

create_schedule()

Create a new recurring schedule.
create_schedule(
    strategy_id: str,
    url: Optional[str] = None,
    urls: Optional[List[str]] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesStrategy UUID to use
urlstrConditionalSingle URL to scrape (use url OR urls, not both)
urlsList[str]ConditionalList of URLs to scrape on each run
interval_secondsintConditionalInterval in seconds (required if no cron)
cron_expressionstrConditionalCron expression (required if no interval)
webhook_urlstrNoWebhook URL for notifications
webhook_metadataDictNoCustom JSON metadata included in every webhook payload
webhook_secretstrNoSecret for X-Webhook-Secret header. Auto-generated if not provided when webhook_url is set
webhook_typestrNo'standard' or 'slack'. Auto-detected from URL if not specified
parametersDictNoDefault API parameter overrides for all scheduled runs (API strategies only)
Returns: Dict with schedule details Example:
# Interval-based
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    interval_seconds=3600  # Every hour
)

# Cron-based
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    cron_expression="0 9 * * *"  # Daily at 9 AM
)

# With webhook
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter"
)

print(f"Schedule created: {schedule['schedule_id']}")
print(f"Next run: {schedule['next_run_at']}")
Example with API parameters:
# For API-based strategies, set default parameters for all runs
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/api/jobs",
    interval_seconds=3600,
    parameters={
        "category": "engineering",
        "location": "remote",
        "limit": 100
    }
)
Example with multiple URLs:
# Monitor multiple pages on a schedule
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    urls=[
        "https://example.com/products/electronics",
        "https://example.com/products/clothing",
        "https://example.com/products/home"
    ],
    interval_seconds=3600
)
You must provide either url or urls, but not both. You must also provide either interval_seconds or cron_expression, but not both.

list_schedules()

List all schedules for the authenticated user.
list_schedules() -> List[Dict]
Returns: List[Dict] of schedules Example:
schedules = client.list_schedules()

for schedule in schedules:
    print(f"Schedule {schedule['schedule_id']}:")
    print(f"  Type: {schedule['schedule_type']}")
    print(f"  Enabled: {schedule['enabled']}")
    print(f"  Next run: {schedule['next_run_at']}")

update_schedule()

Update an existing schedule.
update_schedule(
    schedule_id: str,
    enabled: Optional[bool] = None,
    url: Optional[str] = None,
    urls: Optional[List[str]] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
enabledboolNoEnable/disable schedule
urlstrNoUpdate to single URL
urlsList[str]NoUpdate to multiple URLs
interval_secondsintNoNew interval in seconds
cron_expressionstrNoNew cron expression
webhook_urlstrNoNew webhook URL (or None to remove)
webhook_metadataDictNoUpdate custom JSON metadata for webhook payloads
webhook_secretstrNoUpdate webhook secret
webhook_typestrNoUpdate webhook type: 'standard' or 'slack'
parametersDictNoUpdate API parameter defaults (API strategies only)
Returns: Dict with updated schedule details Example:
# Disable schedule
client.update_schedule(schedule_id, enabled=False)

# Change interval
client.update_schedule(schedule_id, interval_seconds=7200)

# Update webhook
client.update_schedule(
    schedule_id,
    webhook_url="https://new-domain.com/webhooks"
)

# Remove webhook
client.update_schedule(schedule_id, webhook_url=None)

# Update API parameters
client.update_schedule(
    schedule_id,
    parameters={"category": "new-category", "limit": 200}
)
Setting url will clear urls, and vice versa.

delete_schedule()

Delete a schedule (stops future jobs).
delete_schedule(schedule_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID to delete
Returns: Dict with confirmation message Example:
result = client.delete_schedule("880e8400-e29b-41d4-a716-446655440000")
print(result)  # {'message': 'Schedule deleted successfully'}

get_schedule_changes()

Get unseen changes for a schedule (pull-based change detection).
get_schedule_changes(
    schedule_id: str,
    mark_seen: bool = True,
    filter: Optional[str] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
mark_seenboolNoMark returned changes as seen (default: True)
filterstrNoLucene-style keyword filter for results
Returns: Dict with fields:
  • schedule_id (str): Schedule UUID
  • changes (list): Jobs with changes (full job details)
  • count (int): Number of changed jobs
  • marked_seen (bool): Whether changes were marked as seen
Example:
# Get and mark changes as seen
changes = client.get_schedule_changes(
    "880e8400-e29b-41d4-a716-446655440000",
    mark_seen=True
)

if changes['count'] > 0:
    print(f"Found {changes['count']} jobs with changes")
    for change in changes['changes']:
        print(f"Job {change['job_id']}: {change['item_count']} items")
        # Process change['results']

# Preview without marking as seen
preview = client.get_schedule_changes(
    "880e8400-e29b-41d4-a716-446655440000",
    mark_seen=False
)
Example with keyword filtering:
# Filter for items containing both keywords (AND)
changes = client.get_schedule_changes(
    schedule_id,
    filter="+python +remote"
)

# Filter for items containing either keyword (OR)
changes = client.get_schedule_changes(
    schedule_id,
    filter="python javascript"
)

# Exclude items with a keyword
changes = client.get_schedule_changes(
    schedule_id,
    filter="+engineer -manager"
)

# Exact phrase matching
changes = client.get_schedule_changes(
    schedule_id,
    filter='"machine learning"'
)
Use mark_seen=False to preview changes without affecting state. The filter parameter filters individual items within job results.

regenerate_webhook_secret()

Regenerate the webhook secret for a schedule. The old secret is immediately invalidated.
regenerate_webhook_secret(schedule_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
Returns: Dict with schedule_id and the new webhook_secret Example:
result = client.regenerate_webhook_secret("880e8400-e29b-41d4-a716-446655440000")
new_secret = result["webhook_secret"]
print(f"New secret: {new_secret}")
# Update your webhook handler with the new secret
Raises: MeterError if schedule has no webhook URL configured
The new secret is returned only once. Store it securely and update your webhook handler before the next delivery.

Workflow Methods

For workflow methods (create_workflow, run_workflow, wait_for_workflow, etc.), see the dedicated Workflow Methods reference.

Error Handling

All methods raise MeterError on API errors. See Error Handling for details.
from meter_sdk import MeterClient, MeterError

client = MeterClient(api_key="sk_live_...")

try:
    strategy = client.generate_strategy(url, description, name)
except MeterError as e:
    print(f"Error: {e}")
    # Handle error appropriately

Next steps

Need help?

Email me at mckinnon@meter.sh