Skip to main content

MeterClient

The MeterClient class is the main interface for all Meter API operations. It handles authentication, request management, and provides methods for strategies, jobs, and schedules.

Constructor

MeterClient(api_key: str, base_url: str = "https://api.meter.sh")

Parameters

ParameterTypeRequiredDescription
api_keystrYesYour Meter API key (starts with sk_live_)
base_urlstrNoAPI base URL (default: https://api.meter.sh)

Example

from meter_sdk import MeterClient
import os

# Recommended: Load from environment
client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# With custom base URL (for development)
client = MeterClient(
    api_key=os.getenv("METER_API_KEY"),
    base_url="http://localhost:8000"
)

Context Manager

The client can be used as a context manager for automatic resource cleanup:
with MeterClient(api_key="sk_live_...") as client:
    strategies = client.list_strategies()
    # Client automatically closes HTTP connections on exit

Strategy Methods

generate_strategy()

Generate a new extraction strategy using AI.
generate_strategy(
    url: str,
    description: str,
    name: str,
    force_api: bool = False,
    output_schema: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
urlstrYesTarget webpage URL to analyze
descriptionstrYesPlain English description of what to extract
namestrYesHuman-readable name for this strategy
force_apiboolNoForce API-based capture instead of CSS extraction (default: False)
output_schemaDictNoDesired output JSON structure. See Output Schemas
Returns: Dict with fields:
  • strategy_id (str): UUID of the created strategy
  • strategy (dict): The extraction strategy (CSS selectors, fields)
  • preview_data (list): Sample extracted data (first 5-10 items)
  • attempts (int): Number of generation attempts (usually 1)
  • scraper_type (str): Type of scraper used - 'css' or 'api'
  • api_parameters (dict, optional): Available URL parameters for API-based strategies
Example:
result = client.generate_strategy(
    url="https://news.ycombinator.com",
    description="Extract post titles and scores",
    name="HN Front Page"
)

strategy_id = result["strategy_id"]
print(f"Created strategy: {strategy_id}")
print(f"Preview: {result['preview_data'][:3]}")
Example with API capture:
# Force API-based capture for sites with underlying APIs
result = client.generate_strategy(
    url="https://api-heavy-site.com/products",
    description="Extract product listings",
    name="Product API Scraper",
    force_api=True
)

print(f"Scraper type: {result['scraper_type']}")  # 'api' or 'css'

# For API strategies, check available parameters
if result.get('api_parameters'):
    print(f"Available parameters: {result['api_parameters']}")
    # e.g., {'page': 1, 'limit': 20, 'sort': 'price'}
Raises: MeterError if generation fails
When force_api=True, Meter will attempt to identify and capture underlying API calls instead of using CSS selectors. This is useful for sites that load data dynamically via JavaScript APIs.

refine_strategy()

Refine an existing strategy with feedback.
refine_strategy(
    strategy_id: str,
    feedback: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to refine
feedbackstrYesDescription of what to improve or add
Returns: Dict with same fields as generate_strategy() Example:
refined = client.refine_strategy(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    feedback="Also extract the product images and SKU"
)

print(f"Refined preview: {refined['preview_data']}")
Refinement uses cached HTML from initial generation, so it’s fast and doesn’t re-fetch the page.

list_strategies()

List all strategies for the authenticated user.
list_strategies(
    limit: int = 20,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
limitintNoMaximum number of strategies to return (default: 20)
offsetintNoNumber of strategies to skip (default: 0)
Returns: List[Dict] where each dict contains:
  • id (str): Strategy UUID
  • name (str): Strategy name
  • description (str): Extraction description
  • url (str): Original URL used for generation
  • preview_data (list): Sample extracted data
  • created_at (str): ISO timestamp
  • updated_at (str): ISO timestamp
Example:
# Get first 20 strategies
strategies = client.list_strategies()

for strategy in strategies:
    print(f"{strategy['name']}: {strategy['strategy_id']}")

# Pagination
page_2 = client.list_strategies(limit=20, offset=20)

get_strategy()

Get details for a specific strategy.
get_strategy(strategy_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy
Returns: Dict with full strategy details (same fields as list_strategies() items) Example:
strategy = client.get_strategy("550e8400-e29b-41d4-a716-446655440000")

print(f"Name: {strategy['name']}")
print(f"Created: {strategy['created_at']}")
print(f"Preview: {strategy['preview_data']}")
Raises: MeterError with 404 if strategy not found

delete_strategy()

Delete a strategy and all associated jobs and schedules.
delete_strategy(strategy_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to delete
Returns: Dict with confirmation message Example:
result = client.delete_strategy("550e8400-e29b-41d4-a716-446655440000")
print(result)  # {'message': 'Strategy deleted successfully'}
This action is irreversible. All associated jobs and schedules will also be deleted.

Job Methods

create_job()

Create a new scrape job using a strategy.
create_job(
    strategy_id: str,
    url: Optional[str] = None,
    urls: Optional[List[str]] = None,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to use
urlstrConditionalSingle URL to scrape (use url OR urls, not both)
urlsList[str]ConditionalList of URLs to scrape as a batch
parametersDictNoOverride API parameters for this job (API strategies only)
Returns: Dict with fields:
  • job_id (str): UUID of the created job (single URL)
  • batch_id (str): Batch UUID for tracking progress (multiple URLs)
  • status (str): Job status (usually “pending”)
  • strategy_id (str): Strategy UUID
  • url (str): Target URL
  • parameters (dict, optional): Parameters used for this job
  • created_at (str): ISO timestamp
Example:
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/page"
)

print(f"Job created: {job['job_id']}")
print(f"Status: {job['status']}")
Example with API parameters:
# For API-based strategies, override parameters at runtime
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/api/products",
    parameters={
        "page": 2,
        "limit": 50,
        "category": "electronics"
    }
)
Example with batch URLs:
# Scrape multiple URLs in a single batch
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    urls=[
        "https://example.com/products/1",
        "https://example.com/products/2",
        "https://example.com/products/3"
    ]
)

print(f"Batch created: {job['batch_id']}")
You must provide either url or urls, but not both. The parameters option only applies to API-based strategies (where scraper_type is 'api').

execute_job()

Create and execute a scrape job synchronously. Returns results directly without polling.
execute_job(
    strategy_id: str,
    url: str,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to use
urlstrYesURL to scrape
parametersDictNoOverride API parameters for this job (API strategies only)
Returns: Dict with completed job details including results Example:
# Simple synchronous scrape
result = client.execute_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products"
)

print(f"Got {result['item_count']} items")
for item in result['results']:
    print(item)
Example with API parameters:
result = client.execute_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/api/products",
    parameters={"page": 2, "limit": 50}
)
Raises: MeterError if job fails or times out
This endpoint blocks until the job completes (up to 1 hour timeout). Use create_job() + wait_for_job() for more control over polling behavior, or create_job() alone for fire-and-forget jobs.

get_job()

Get status and results for a job.
get_job(job_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesUUID of the job
Returns: Dict with fields:
  • job_id (str): Job UUID
  • status (str): “pending”, “running”, “completed”, or “failed”
  • results (list): Extracted data (only if status is “completed”)
  • item_count (int): Number of items extracted
  • content_hash (str): Hash for change detection
  • structural_signature (dict): Structural fingerprint
  • error (str): Error message (only if status is “failed”)
  • started_at (str): ISO timestamp
  • completed_at (str): ISO timestamp
  • created_at (str): ISO timestamp
Example:
job = client.get_job("660e8400-e29b-41d4-a716-446655440000")

if job['status'] == 'completed':
    print(f"Extracted {job['item_count']} items")
    for item in job['results']:
        print(item)
elif job['status'] == 'failed':
    print(f"Job failed: {job['error']}")
else:
    print(f"Job is {job['status']}")

wait_for_job()

Wait for a job to complete, polling automatically.
wait_for_job(
    job_id: str,
    poll_interval: float = 1.0,
    timeout: Optional[float] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesUUID of the job to wait for
poll_intervalfloatNoSeconds between status checks (default: 1.0)
timeoutfloatNoMaximum seconds to wait (default: None = infinite)
Returns: Dict with completed job details (same as get_job()) Example:
from meter_sdk import MeterError

# Wait indefinitely
completed = client.wait_for_job("660e8400-e29b-41d4-a716-446655440000")
print(f"Done! {completed['item_count']} items")

# With timeout
try:
    completed = client.wait_for_job(
        "660e8400-e29b-41d4-a716-446655440000",
        poll_interval=2.0,
        timeout=300.0  # 5 minutes
    )
except MeterError as e:
    print(f"Timeout or error: {e}")
Raises: MeterError if timeout exceeded or job fails

list_jobs()

List jobs with optional filtering.
list_jobs(
    strategy_id: Optional[str] = None,
    status: Optional[str] = None,
    limit: int = 20,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
strategy_idstrNoFilter by strategy UUID
statusstrNoFilter by status: “pending”, “running”, “completed”, “failed”
limitintNoMaximum jobs to return (default: 20)
offsetintNoNumber of jobs to skip (default: 0)
Returns: List[Dict] of job summaries Example:
# All jobs
all_jobs = client.list_jobs(limit=50)

# Jobs for specific strategy
strategy_jobs = client.list_jobs(
    strategy_id="550e8400-e29b-41d4-a716-446655440000"
)

# Only failed jobs
failed = client.list_jobs(status="failed", limit=10)

# Combined filters
recent_completed = client.list_jobs(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    status="completed",
    limit=5
)

compare_jobs()

Compare two jobs to detect changes.
compare_jobs(
    job_id: str,
    other_job_id: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesFirst job UUID
other_job_idstrYesSecond job UUID to compare with
Returns: Dict with fields:
  • content_hash_match (bool): True if content hashes match
  • structural_match (bool): True if structure matches
  • semantic_similarity (float): Similarity score 0.0-1.0 (planned feature)
  • changes (list): Detected structural changes
Example:
comparison = client.compare_jobs(
    "660e8400-e29b-41d4-a716-446655440000",
    "770e8400-e29b-41d4-a716-446655440000"
)

print(f"Content match: {comparison['content_hash_match']}")
print(f"Structural match: {comparison['structural_match']}")

if not comparison['content_hash_match']:
    print("Content has changed!")
    for change in comparison.get('changes', []):
        print(f"  - {change}")

get_strategy_history()

Get timeline of all jobs for a strategy.
get_strategy_history(strategy_id: str) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesStrategy UUID
Returns: List[Dict] where each dict contains:
  • job_id (str): Job UUID
  • status (str): Job status
  • item_count (int): Items extracted
  • has_changes (bool): True if content changed vs. previous job
  • created_at (str): ISO timestamp
Example:
history = client.get_strategy_history("550e8400-e29b-41d4-a716-446655440000")

for entry in history:
    status_icon = "✓" if entry['status'] == 'completed' else "✗"
    change_icon = "📝" if entry['has_changes'] else "—"
    print(f"{status_icon} {entry['created_at']}: {entry['item_count']} items {change_icon}")

Schedule Methods

create_schedule()

Create a new recurring schedule.
create_schedule(
    strategy_id: str,
    url: Optional[str] = None,
    urls: Optional[List[str]] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesStrategy UUID to use
urlstrConditionalSingle URL to scrape (use url OR urls, not both)
urlsList[str]ConditionalList of URLs to scrape on each run
interval_secondsintConditionalInterval in seconds (required if no cron)
cron_expressionstrConditionalCron expression (required if no interval)
webhook_urlstrNoWebhook URL for notifications
webhook_metadataDictNoCustom JSON metadata included in every webhook payload
webhook_secretstrNoSecret for X-Webhook-Secret header. Auto-generated if not provided when webhook_url is set
webhook_typestrNo'standard' or 'slack'. Auto-detected from URL if not specified
parametersDictNoDefault API parameter overrides for all scheduled runs (API strategies only)
Returns: Dict with schedule details Example:
# Interval-based
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    interval_seconds=3600  # Every hour
)

# Cron-based
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    cron_expression="0 9 * * *"  # Daily at 9 AM
)

# With webhook
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter"
)

print(f"Schedule created: {schedule['schedule_id']}")
print(f"Next run: {schedule['next_run_at']}")
Example with API parameters:
# For API-based strategies, set default parameters for all runs
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/api/jobs",
    interval_seconds=3600,
    parameters={
        "category": "engineering",
        "location": "remote",
        "limit": 100
    }
)
Example with multiple URLs:
# Monitor multiple pages on a schedule
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    urls=[
        "https://example.com/products/electronics",
        "https://example.com/products/clothing",
        "https://example.com/products/home"
    ],
    interval_seconds=3600
)
You must provide either url or urls, but not both. You must also provide either interval_seconds or cron_expression, but not both.

list_schedules()

List all schedules for the authenticated user.
list_schedules() -> List[Dict]
Returns: List[Dict] of schedules Example:
schedules = client.list_schedules()

for schedule in schedules:
    print(f"Schedule {schedule['schedule_id']}:")
    print(f"  Type: {schedule['schedule_type']}")
    print(f"  Enabled: {schedule['enabled']}")
    print(f"  Next run: {schedule['next_run_at']}")

update_schedule()

Update an existing schedule.
update_schedule(
    schedule_id: str,
    enabled: Optional[bool] = None,
    url: Optional[str] = None,
    urls: Optional[List[str]] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None,
    parameters: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
enabledboolNoEnable/disable schedule
urlstrNoUpdate to single URL
urlsList[str]NoUpdate to multiple URLs
interval_secondsintNoNew interval in seconds
cron_expressionstrNoNew cron expression
webhook_urlstrNoNew webhook URL (or None to remove)
webhook_metadataDictNoUpdate custom JSON metadata for webhook payloads
webhook_secretstrNoUpdate webhook secret
webhook_typestrNoUpdate webhook type: 'standard' or 'slack'
parametersDictNoUpdate API parameter defaults (API strategies only)
Returns: Dict with updated schedule details Example:
# Disable schedule
client.update_schedule(schedule_id, enabled=False)

# Change interval
client.update_schedule(schedule_id, interval_seconds=7200)

# Update webhook
client.update_schedule(
    schedule_id,
    webhook_url="https://new-domain.com/webhooks"
)

# Remove webhook
client.update_schedule(schedule_id, webhook_url=None)

# Update API parameters
client.update_schedule(
    schedule_id,
    parameters={"category": "new-category", "limit": 200}
)
Setting url will clear urls, and vice versa.

delete_schedule()

Delete a schedule (stops future jobs).
delete_schedule(schedule_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID to delete
Returns: Dict with confirmation message Example:
result = client.delete_schedule("880e8400-e29b-41d4-a716-446655440000")
print(result)  # {'message': 'Schedule deleted successfully'}

get_schedule_changes()

Get unseen changes for a schedule (pull-based change detection).
get_schedule_changes(
    schedule_id: str,
    mark_seen: bool = True,
    filter: Optional[str] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
mark_seenboolNoMark returned changes as seen (default: True)
filterstrNoLucene-style keyword filter for results
Returns: Dict with fields:
  • schedule_id (str): Schedule UUID
  • changes (list): Jobs with changes (full job details)
  • count (int): Number of changed jobs
  • marked_seen (bool): Whether changes were marked as seen
Example:
# Get and mark changes as seen
changes = client.get_schedule_changes(
    "880e8400-e29b-41d4-a716-446655440000",
    mark_seen=True
)

if changes['count'] > 0:
    print(f"Found {changes['count']} jobs with changes")
    for change in changes['changes']:
        print(f"Job {change['job_id']}: {change['item_count']} items")
        # Process change['results']

# Preview without marking as seen
preview = client.get_schedule_changes(
    "880e8400-e29b-41d4-a716-446655440000",
    mark_seen=False
)
Example with keyword filtering:
# Filter for items containing both keywords (AND)
changes = client.get_schedule_changes(
    schedule_id,
    filter="+python +remote"
)

# Filter for items containing either keyword (OR)
changes = client.get_schedule_changes(
    schedule_id,
    filter="python javascript"
)

# Exclude items with a keyword
changes = client.get_schedule_changes(
    schedule_id,
    filter="+engineer -manager"
)

# Exact phrase matching
changes = client.get_schedule_changes(
    schedule_id,
    filter='"machine learning"'
)
Use mark_seen=False to preview changes without affecting state. The filter parameter filters individual items within job results.

regenerate_webhook_secret()

Regenerate the webhook secret for a schedule. The old secret is immediately invalidated.
regenerate_webhook_secret(schedule_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
Returns: Dict with schedule_id and the new webhook_secret Example:
result = client.regenerate_webhook_secret("880e8400-e29b-41d4-a716-446655440000")
new_secret = result["webhook_secret"]
print(f"New secret: {new_secret}")
# Update your webhook handler with the new secret
Raises: MeterError if schedule has no webhook URL configured
The new secret is returned only once. Store it securely and update your webhook handler before the next delivery.

Workflow Methods

For workflow methods (create_workflow, run_workflow, wait_for_workflow, etc.), see the dedicated Workflow Methods reference.

Strategy Group Methods

Manage collections of strategies with shared schedules and output schemas. See Strategy Groups for concepts.

create_strategy_group()

Create a new strategy group.
create_strategy_group(
    name: str,
    description: Optional[str] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
namestrYesName for the group
descriptionstrNoDescription of the group
Returns: Dict with id, name, description, strategy_count, created_at, updated_at Example:
group = client.create_strategy_group(
    name="E-commerce Monitors",
    description="Product price tracking across 50 stores"
)
print(f"Group ID: {group['id']}")

list_strategy_groups()

List all strategy groups.
list_strategy_groups(
    limit: int = 50,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
limitintNoMax results (default: 50)
offsetintNoResults to skip (default: 0)
Returns: List[Dict] of strategy groups with strategy counts Example:
groups = client.list_strategy_groups()
for g in groups:
    print(f"{g['name']}: {g['strategy_count']} strategies")

get_strategy_group()

Get group details including member strategies.
get_strategy_group(group_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
Returns: Dict with id, name, description, strategies (list), created_at, updated_at Example:
detail = client.get_strategy_group("aa0e8400-e29b-41d4-a716-446655440000")
for s in detail["strategies"]:
    print(f"  - {s['name']}")

update_strategy_group()

Update a group’s name or description.
update_strategy_group(
    group_id: str,
    name: Optional[str] = None,
    description: Optional[str] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
namestrNoNew name
descriptionstrNoNew description
Returns: Dict with updated group details

delete_strategy_group()

Delete a group. Strategies become ungrouped (not deleted).
delete_strategy_group(group_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
Returns: Dict with confirmation message

add_strategies_to_group()

Add existing strategies to a group.
add_strategies_to_group(
    group_id: str,
    strategy_ids: List[str]
) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
strategy_idsList[str]YesStrategy UUIDs to add
Returns: Dict with confirmation message Example:
client.add_strategies_to_group(
    group_id=group["id"],
    strategy_ids=["550e8400-...", "660e8400-..."]
)

remove_strategy_from_group()

Remove a strategy from a group without deleting it.
remove_strategy_from_group(
    group_id: str,
    strategy_id: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
strategy_idstrYesStrategy UUID to remove
Returns: Dict with confirmation message

apply_group_schedule()

Apply a schedule to all strategies in a group.
apply_group_schedule(
    group_id: str,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
interval_secondsintConditionalRun every N seconds (min: 60)
cron_expressionstrConditionalCron expression
webhook_urlstrNoWebhook URL for notifications
webhook_secretstrNoWebhook secret (auto-generated if not provided)
webhook_typestrNo'standard', 'slack', 'slack_workflow', or 'discord'
webhook_metadataDictNoCustom JSON metadata for webhook payloads
Returns: Dict with message, created count, updated count Example:
client.apply_group_schedule(
    group_id=group["id"],
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter"
)
Provide either interval_seconds or cron_expression, not both.

delete_group_schedules()

Delete all schedules for strategies in a group.
delete_group_schedules(group_id: str) -> Dict

toggle_group_schedules()

Enable or disable all schedules in a group.
toggle_group_schedules(
    group_id: str,
    enabled: bool
) -> Dict
Example:
# Pause all group schedules
client.toggle_group_schedules(group["id"], enabled=False)

# Resume
client.toggle_group_schedules(group["id"], enabled=True)

apply_group_schema()

Apply an output schema to all strategies in a group. Triggers async regeneration.
apply_group_schema(
    group_id: str,
    output_schema: Dict[str, Any]
) -> Dict
Parameters:
ParameterTypeRequiredDescription
group_idstrYesStrategy group UUID
output_schemaDictYesJSON schema defining output structure
Returns: Dict with message and strategy_count Example:
client.apply_group_schema(
    group_id=group["id"],
    output_schema={
        "title": "string",
        "price": "number",
        "in_stock": "boolean"
    }
)

get_schema_progress()

Poll progress of a group schema regeneration.
get_schema_progress(group_id: str) -> Dict

test_group_webhook()

Send a test webhook using a group’s webhook configuration.
test_group_webhook(
    group_id: str,
    webhook_url: str,
    webhook_secret: Optional[str] = None,
    webhook_type: Optional[str] = None,
    webhook_metadata: Optional[Dict[str, Any]] = None
) -> Dict
Returns: Dict with success, status_code, message

Error Handling

All methods raise MeterError on API errors. See Error Handling for details.
from meter_sdk import MeterClient, MeterError

client = MeterClient(api_key="sk_live_...")

try:
    strategy = client.generate_strategy(url, description, name)
except MeterError as e:
    print(f"Error: {e}")
    # Handle error appropriately

Next steps

Error Handling

Learn about error handling and exceptions

Quick Start

See the SDK in action with examples

Strategies Guide

Deep dive into strategy concepts

Jobs Guide

Understanding job lifecycle

Need help?

Email me at mckinnon@meter.sh