MeterClient
The MeterClient class is the main interface for all Meter API operations. It handles authentication, request management, and provides methods for strategies, jobs, and schedules.
Constructor
MeterClient(api_key: str , base_url: str = "https://api.meter.sh" )
Parameters
Parameter Type Required Description api_keystrYes Your Meter API key (starts with sk_live_) base_urlstrNo API base URL (default: https://api.meter.sh)
Example
from meter_sdk import MeterClient
import os
# Recommended: Load from environment
client = MeterClient( api_key = os.getenv( "METER_API_KEY" ))
# With custom base URL (for development)
client = MeterClient(
api_key = os.getenv( "METER_API_KEY" ),
base_url = "http://localhost:8000"
)
Context Manager
The client can be used as a context manager for automatic resource cleanup:
with MeterClient( api_key = "sk_live_..." ) as client:
strategies = client.list_strategies()
# Client automatically closes HTTP connections on exit
Strategy Methods
generate_strategy()
Generate a new extraction strategy using AI.
generate_strategy(
url: str ,
description: str ,
name: str ,
force_api: bool = False ,
output_schema: Optional[Dict[ str , Any]] = None
) -> Dict
Parameters:
Parameter Type Required Description urlstrYes Target webpage URL to analyze descriptionstrYes Plain English description of what to extract namestrYes Human-readable name for this strategy force_apiboolNo Force API-based capture instead of CSS extraction (default: False) output_schemaDictNo Desired output JSON structure. See Output Schemas
Returns: Dict with fields:
strategy_id (str): UUID of the created strategy
strategy (dict): The extraction strategy (CSS selectors, fields)
preview_data (list): Sample extracted data (first 5-10 items)
attempts (int): Number of generation attempts (usually 1)
scraper_type (str): Type of scraper used - 'css' or 'api'
api_parameters (dict, optional): Available URL parameters for API-based strategies
Example:
result = client.generate_strategy(
url = "https://news.ycombinator.com" ,
description = "Extract post titles and scores" ,
name = "HN Front Page"
)
strategy_id = result[ "strategy_id" ]
print ( f "Created strategy: { strategy_id } " )
print ( f "Preview: { result[ 'preview_data' ][: 3 ] } " )
Example with API capture:
# Force API-based capture for sites with underlying APIs
result = client.generate_strategy(
url = "https://api-heavy-site.com/products" ,
description = "Extract product listings" ,
name = "Product API Scraper" ,
force_api = True
)
print ( f "Scraper type: { result[ 'scraper_type' ] } " ) # 'api' or 'css'
# For API strategies, check available parameters
if result.get( 'api_parameters' ):
print ( f "Available parameters: { result[ 'api_parameters' ] } " )
# e.g., {'page': 1, 'limit': 20, 'sort': 'price'}
Raises: MeterError if generation fails
When force_api=True, Meter will attempt to identify and capture underlying API calls
instead of using CSS selectors. This is useful for sites that load data dynamically
via JavaScript APIs.
refine_strategy()
Refine an existing strategy with feedback.
refine_strategy(
strategy_id: str ,
feedback: str
) -> Dict
Parameters:
Parameter Type Required Description strategy_idstrYes UUID of the strategy to refine feedbackstrYes Description of what to improve or add
Returns: Dict with same fields as generate_strategy()
Example:
refined = client.refine_strategy(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
feedback = "Also extract the product images and SKU"
)
print ( f "Refined preview: { refined[ 'preview_data' ] } " )
Refinement uses cached HTML from initial generation, so it’s fast and doesn’t re-fetch the page.
list_strategies()
List all strategies for the authenticated user.
list_strategies(
limit: int = 20 ,
offset: int = 0
) -> List[Dict]
Parameters:
Parameter Type Required Description limitintNo Maximum number of strategies to return (default: 20) offsetintNo Number of strategies to skip (default: 0)
Returns: List[Dict] where each dict contains:
id (str): Strategy UUID
name (str): Strategy name
description (str): Extraction description
url (str): Original URL used for generation
preview_data (list): Sample extracted data
created_at (str): ISO timestamp
updated_at (str): ISO timestamp
Example:
# Get first 20 strategies
strategies = client.list_strategies()
for strategy in strategies:
print ( f " { strategy[ 'name' ] } : { strategy[ 'strategy_id' ] } " )
# Pagination
page_2 = client.list_strategies( limit = 20 , offset = 20 )
get_strategy()
Get details for a specific strategy.
get_strategy(strategy_id: str ) -> Dict
Parameters:
Parameter Type Required Description strategy_idstrYes UUID of the strategy
Returns: Dict with full strategy details (same fields as list_strategies() items)
Example:
strategy = client.get_strategy( "550e8400-e29b-41d4-a716-446655440000" )
print ( f "Name: { strategy[ 'name' ] } " )
print ( f "Created: { strategy[ 'created_at' ] } " )
print ( f "Preview: { strategy[ 'preview_data' ] } " )
Raises: MeterError with 404 if strategy not found
delete_strategy()
Delete a strategy and all associated jobs and schedules.
delete_strategy(strategy_id: str ) -> Dict
Parameters:
Parameter Type Required Description strategy_idstrYes UUID of the strategy to delete
Returns: Dict with confirmation message
Example:
result = client.delete_strategy( "550e8400-e29b-41d4-a716-446655440000" )
print (result) # {'message': 'Strategy deleted successfully'}
This action is irreversible. All associated jobs and schedules will also be deleted.
Job Methods
create_job()
Create a new scrape job using a strategy.
create_job(
strategy_id: str ,
url: Optional[ str ] = None ,
urls: Optional[List[ str ]] = None ,
parameters: Optional[Dict[ str , Any]] = None
) -> Dict
Parameters:
Parameter Type Required Description strategy_idstrYes UUID of the strategy to use urlstrConditional Single URL to scrape (use url OR urls, not both) urlsList[str]Conditional List of URLs to scrape as a batch parametersDictNo Override API parameters for this job (API strategies only)
Returns: Dict with fields:
job_id (str): UUID of the created job (single URL)
batch_id (str): Batch UUID for tracking progress (multiple URLs)
status (str): Job status (usually “pending”)
strategy_id (str): Strategy UUID
url (str): Target URL
parameters (dict, optional): Parameters used for this job
created_at (str): ISO timestamp
Example:
job = client.create_job(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/page"
)
print ( f "Job created: { job[ 'job_id' ] } " )
print ( f "Status: { job[ 'status' ] } " )
Example with API parameters:
# For API-based strategies, override parameters at runtime
job = client.create_job(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/api/products" ,
parameters = {
"page" : 2 ,
"limit" : 50 ,
"category" : "electronics"
}
)
Example with batch URLs:
# Scrape multiple URLs in a single batch
job = client.create_job(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
urls = [
"https://example.com/products/1" ,
"https://example.com/products/2" ,
"https://example.com/products/3"
]
)
print ( f "Batch created: { job[ 'batch_id' ] } " )
You must provide either url or urls, but not both. The parameters option
only applies to API-based strategies (where scraper_type is 'api').
execute_job()
Create and execute a scrape job synchronously. Returns results directly without polling.
execute_job(
strategy_id: str ,
url: str ,
parameters: Optional[Dict[ str , Any]] = None
) -> Dict
Parameters:
Parameter Type Required Description strategy_idstrYes UUID of the strategy to use urlstrYes URL to scrape parametersDictNo Override API parameters for this job (API strategies only)
Returns: Dict with completed job details including results
Example:
# Simple synchronous scrape
result = client.execute_job(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/products"
)
print ( f "Got { result[ 'item_count' ] } items" )
for item in result[ 'results' ]:
print (item)
Example with API parameters:
result = client.execute_job(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/api/products" ,
parameters = { "page" : 2 , "limit" : 50 }
)
Raises: MeterError if job fails or times out
This endpoint blocks until the job completes (up to 1 hour timeout). Use create_job() + wait_for_job() for more control over polling behavior, or create_job() alone for fire-and-forget jobs.
get_job()
Get status and results for a job.
get_job(job_id: str ) -> Dict
Parameters:
Parameter Type Required Description job_idstrYes UUID of the job
Returns: Dict with fields:
job_id (str): Job UUID
status (str): “pending”, “running”, “completed”, or “failed”
results (list): Extracted data (only if status is “completed”)
item_count (int): Number of items extracted
content_hash (str): Hash for change detection
structural_signature (dict): Structural fingerprint
error (str): Error message (only if status is “failed”)
started_at (str): ISO timestamp
completed_at (str): ISO timestamp
created_at (str): ISO timestamp
Example:
job = client.get_job( "660e8400-e29b-41d4-a716-446655440000" )
if job[ 'status' ] == 'completed' :
print ( f "Extracted { job[ 'item_count' ] } items" )
for item in job[ 'results' ]:
print (item)
elif job[ 'status' ] == 'failed' :
print ( f "Job failed: { job[ 'error' ] } " )
else :
print ( f "Job is { job[ 'status' ] } " )
wait_for_job()
Wait for a job to complete, polling automatically.
wait_for_job(
job_id: str ,
poll_interval: float = 1.0 ,
timeout: Optional[ float ] = None
) -> Dict
Parameters:
Parameter Type Required Description job_idstrYes UUID of the job to wait for poll_intervalfloatNo Seconds between status checks (default: 1.0) timeoutfloatNo Maximum seconds to wait (default: None = infinite)
Returns: Dict with completed job details (same as get_job())
Example:
from meter_sdk import MeterError
# Wait indefinitely
completed = client.wait_for_job( "660e8400-e29b-41d4-a716-446655440000" )
print ( f "Done! { completed[ 'item_count' ] } items" )
# With timeout
try :
completed = client.wait_for_job(
"660e8400-e29b-41d4-a716-446655440000" ,
poll_interval = 2.0 ,
timeout = 300.0 # 5 minutes
)
except MeterError as e:
print ( f "Timeout or error: { e } " )
Raises: MeterError if timeout exceeded or job fails
list_jobs()
List jobs with optional filtering.
list_jobs(
strategy_id: Optional[ str ] = None ,
status: Optional[ str ] = None ,
limit: int = 20 ,
offset: int = 0
) -> List[Dict]
Parameters:
Parameter Type Required Description strategy_idstrNo Filter by strategy UUID statusstrNo Filter by status: “pending”, “running”, “completed”, “failed” limitintNo Maximum jobs to return (default: 20) offsetintNo Number of jobs to skip (default: 0)
Returns: List[Dict] of job summaries
Example:
# All jobs
all_jobs = client.list_jobs( limit = 50 )
# Jobs for specific strategy
strategy_jobs = client.list_jobs(
strategy_id = "550e8400-e29b-41d4-a716-446655440000"
)
# Only failed jobs
failed = client.list_jobs( status = "failed" , limit = 10 )
# Combined filters
recent_completed = client.list_jobs(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
status = "completed" ,
limit = 5
)
compare_jobs()
Compare two jobs to detect changes.
compare_jobs(
job_id: str ,
other_job_id: str
) -> Dict
Parameters:
Parameter Type Required Description job_idstrYes First job UUID other_job_idstrYes Second job UUID to compare with
Returns: Dict with fields:
content_hash_match (bool): True if content hashes match
structural_match (bool): True if structure matches
semantic_similarity (float): Similarity score 0.0-1.0 (planned feature)
changes (list): Detected structural changes
Example:
comparison = client.compare_jobs(
"660e8400-e29b-41d4-a716-446655440000" ,
"770e8400-e29b-41d4-a716-446655440000"
)
print ( f "Content match: { comparison[ 'content_hash_match' ] } " )
print ( f "Structural match: { comparison[ 'structural_match' ] } " )
if not comparison[ 'content_hash_match' ]:
print ( "Content has changed!" )
for change in comparison.get( 'changes' , []):
print ( f " - { change } " )
get_strategy_history()
Get timeline of all jobs for a strategy.
get_strategy_history(strategy_id: str ) -> List[Dict]
Parameters:
Parameter Type Required Description strategy_idstrYes Strategy UUID
Returns: List[Dict] where each dict contains:
job_id (str): Job UUID
status (str): Job status
item_count (int): Items extracted
has_changes (bool): True if content changed vs. previous job
created_at (str): ISO timestamp
Example:
history = client.get_strategy_history( "550e8400-e29b-41d4-a716-446655440000" )
for entry in history:
status_icon = "✓" if entry[ 'status' ] == 'completed' else "✗"
change_icon = "📝" if entry[ 'has_changes' ] else "—"
print ( f " { status_icon } { entry[ 'created_at' ] } : { entry[ 'item_count' ] } items { change_icon } " )
Schedule Methods
create_schedule()
Create a new recurring schedule.
create_schedule(
strategy_id: str ,
url: Optional[ str ] = None ,
urls: Optional[List[ str ]] = None ,
interval_seconds: Optional[ int ] = None ,
cron_expression: Optional[ str ] = None ,
webhook_url: Optional[ str ] = None ,
webhook_metadata: Optional[Dict[ str , Any]] = None ,
webhook_secret: Optional[ str ] = None ,
webhook_type: Optional[ str ] = None ,
parameters: Optional[Dict[ str , Any]] = None
) -> Dict
Parameters:
Parameter Type Required Description strategy_idstrYes Strategy UUID to use urlstrConditional Single URL to scrape (use url OR urls, not both) urlsList[str]Conditional List of URLs to scrape on each run interval_secondsintConditional Interval in seconds (required if no cron) cron_expressionstrConditional Cron expression (required if no interval) webhook_urlstrNo Webhook URL for notifications webhook_metadataDictNo Custom JSON metadata included in every webhook payload webhook_secretstrNo Secret for X-Webhook-Secret header. Auto-generated if not provided when webhook_url is set webhook_typestrNo 'standard' or 'slack'. Auto-detected from URL if not specifiedparametersDictNo Default API parameter overrides for all scheduled runs (API strategies only)
Returns: Dict with schedule details
Example:
# Interval-based
schedule = client.create_schedule(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/products" ,
interval_seconds = 3600 # Every hour
)
# Cron-based
schedule = client.create_schedule(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/products" ,
cron_expression = "0 9 * * *" # Daily at 9 AM
)
# With webhook
schedule = client.create_schedule(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/products" ,
interval_seconds = 3600 ,
webhook_url = "https://your-app.com/webhooks/meter"
)
print ( f "Schedule created: { schedule[ 'schedule_id' ] } " )
print ( f "Next run: { schedule[ 'next_run_at' ] } " )
Example with API parameters:
# For API-based strategies, set default parameters for all runs
schedule = client.create_schedule(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
url = "https://example.com/api/jobs" ,
interval_seconds = 3600 ,
parameters = {
"category" : "engineering" ,
"location" : "remote" ,
"limit" : 100
}
)
Example with multiple URLs:
# Monitor multiple pages on a schedule
schedule = client.create_schedule(
strategy_id = "550e8400-e29b-41d4-a716-446655440000" ,
urls = [
"https://example.com/products/electronics" ,
"https://example.com/products/clothing" ,
"https://example.com/products/home"
],
interval_seconds = 3600
)
You must provide either url or urls, but not both. You must also provide
either interval_seconds or cron_expression, but not both.
list_schedules()
List all schedules for the authenticated user.
list_schedules() -> List[Dict]
Returns: List[Dict] of schedules
Example:
schedules = client.list_schedules()
for schedule in schedules:
print ( f "Schedule { schedule[ 'schedule_id' ] } :" )
print ( f " Type: { schedule[ 'schedule_type' ] } " )
print ( f " Enabled: { schedule[ 'enabled' ] } " )
print ( f " Next run: { schedule[ 'next_run_at' ] } " )
update_schedule()
Update an existing schedule.
update_schedule(
schedule_id: str ,
enabled: Optional[ bool ] = None ,
url: Optional[ str ] = None ,
urls: Optional[List[ str ]] = None ,
interval_seconds: Optional[ int ] = None ,
cron_expression: Optional[ str ] = None ,
webhook_url: Optional[ str ] = None ,
webhook_metadata: Optional[Dict[ str , Any]] = None ,
webhook_secret: Optional[ str ] = None ,
webhook_type: Optional[ str ] = None ,
parameters: Optional[Dict[ str , Any]] = None
) -> Dict
Parameters:
Parameter Type Required Description schedule_idstrYes Schedule UUID enabledboolNo Enable/disable schedule urlstrNo Update to single URL urlsList[str]No Update to multiple URLs interval_secondsintNo New interval in seconds cron_expressionstrNo New cron expression webhook_urlstrNo New webhook URL (or None to remove) webhook_metadataDictNo Update custom JSON metadata for webhook payloads webhook_secretstrNo Update webhook secret webhook_typestrNo Update webhook type: 'standard' or 'slack' parametersDictNo Update API parameter defaults (API strategies only)
Returns: Dict with updated schedule details
Example:
# Disable schedule
client.update_schedule(schedule_id, enabled = False )
# Change interval
client.update_schedule(schedule_id, interval_seconds = 7200 )
# Update webhook
client.update_schedule(
schedule_id,
webhook_url = "https://new-domain.com/webhooks"
)
# Remove webhook
client.update_schedule(schedule_id, webhook_url = None )
# Update API parameters
client.update_schedule(
schedule_id,
parameters = { "category" : "new-category" , "limit" : 200 }
)
Setting url will clear urls, and vice versa.
delete_schedule()
Delete a schedule (stops future jobs).
delete_schedule(schedule_id: str ) -> Dict
Parameters:
Parameter Type Required Description schedule_idstrYes Schedule UUID to delete
Returns: Dict with confirmation message
Example:
result = client.delete_schedule( "880e8400-e29b-41d4-a716-446655440000" )
print (result) # {'message': 'Schedule deleted successfully'}
get_schedule_changes()
Get unseen changes for a schedule (pull-based change detection).
get_schedule_changes(
schedule_id: str ,
mark_seen: bool = True ,
filter : Optional[ str ] = None
) -> Dict
Parameters:
Parameter Type Required Description schedule_idstrYes Schedule UUID mark_seenboolNo Mark returned changes as seen (default: True) filterstrNo Lucene-style keyword filter for results
Returns: Dict with fields:
schedule_id (str): Schedule UUID
changes (list): Jobs with changes (full job details)
count (int): Number of changed jobs
marked_seen (bool): Whether changes were marked as seen
Example:
# Get and mark changes as seen
changes = client.get_schedule_changes(
"880e8400-e29b-41d4-a716-446655440000" ,
mark_seen = True
)
if changes[ 'count' ] > 0 :
print ( f "Found { changes[ 'count' ] } jobs with changes" )
for change in changes[ 'changes' ]:
print ( f "Job { change[ 'job_id' ] } : { change[ 'item_count' ] } items" )
# Process change['results']
# Preview without marking as seen
preview = client.get_schedule_changes(
"880e8400-e29b-41d4-a716-446655440000" ,
mark_seen = False
)
Example with keyword filtering:
# Filter for items containing both keywords (AND)
changes = client.get_schedule_changes(
schedule_id,
filter = "+python +remote"
)
# Filter for items containing either keyword (OR)
changes = client.get_schedule_changes(
schedule_id,
filter = "python javascript"
)
# Exclude items with a keyword
changes = client.get_schedule_changes(
schedule_id,
filter = "+engineer -manager"
)
# Exact phrase matching
changes = client.get_schedule_changes(
schedule_id,
filter = '"machine learning"'
)
Use mark_seen=False to preview changes without affecting state.
The filter parameter filters individual items within job results.
regenerate_webhook_secret()
Regenerate the webhook secret for a schedule. The old secret is immediately invalidated.
regenerate_webhook_secret(schedule_id: str ) -> Dict
Parameters:
Parameter Type Required Description schedule_idstrYes Schedule UUID
Returns: Dict with schedule_id and the new webhook_secret
Example:
result = client.regenerate_webhook_secret( "880e8400-e29b-41d4-a716-446655440000" )
new_secret = result[ "webhook_secret" ]
print ( f "New secret: { new_secret } " )
# Update your webhook handler with the new secret
Raises: MeterError if schedule has no webhook URL configured
The new secret is returned only once. Store it securely and update your webhook handler before the next delivery.
Workflow Methods
For workflow methods (create_workflow, run_workflow, wait_for_workflow, etc.), see the dedicated Workflow Methods reference.
Strategy Group Methods
Manage collections of strategies with shared schedules and output schemas. See Strategy Groups for concepts.
create_strategy_group()
Create a new strategy group.
create_strategy_group(
name: str ,
description: Optional[ str ] = None
) -> Dict
Parameters:
Parameter Type Required Description namestrYes Name for the group descriptionstrNo Description of the group
Returns: Dict with id, name, description, strategy_count, created_at, updated_at
Example:
group = client.create_strategy_group(
name = "E-commerce Monitors" ,
description = "Product price tracking across 50 stores"
)
print ( f "Group ID: { group[ 'id' ] } " )
list_strategy_groups()
List all strategy groups.
list_strategy_groups(
limit: int = 50 ,
offset: int = 0
) -> List[Dict]
Parameters:
Parameter Type Required Description limitintNo Max results (default: 50) offsetintNo Results to skip (default: 0)
Returns: List[Dict] of strategy groups with strategy counts
Example:
groups = client.list_strategy_groups()
for g in groups:
print ( f " { g[ 'name' ] } : { g[ 'strategy_count' ] } strategies" )
get_strategy_group()
Get group details including member strategies.
get_strategy_group(group_id: str ) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID
Returns: Dict with id, name, description, strategies (list), created_at, updated_at
Example:
detail = client.get_strategy_group( "aa0e8400-e29b-41d4-a716-446655440000" )
for s in detail[ "strategies" ]:
print ( f " - { s[ 'name' ] } " )
update_strategy_group()
Update a group’s name or description.
update_strategy_group(
group_id: str ,
name: Optional[ str ] = None ,
description: Optional[ str ] = None
) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID namestrNo New name descriptionstrNo New description
Returns: Dict with updated group details
delete_strategy_group()
Delete a group. Strategies become ungrouped (not deleted).
delete_strategy_group(group_id: str ) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID
Returns: Dict with confirmation message
add_strategies_to_group()
Add existing strategies to a group.
add_strategies_to_group(
group_id: str ,
strategy_ids: List[ str ]
) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID strategy_idsList[str]Yes Strategy UUIDs to add
Returns: Dict with confirmation message
Example:
client.add_strategies_to_group(
group_id = group[ "id" ],
strategy_ids = [ "550e8400-..." , "660e8400-..." ]
)
remove_strategy_from_group()
Remove a strategy from a group without deleting it.
remove_strategy_from_group(
group_id: str ,
strategy_id: str
) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID strategy_idstrYes Strategy UUID to remove
Returns: Dict with confirmation message
apply_group_schedule()
Apply a schedule to all strategies in a group.
apply_group_schedule(
group_id: str ,
interval_seconds: Optional[ int ] = None ,
cron_expression: Optional[ str ] = None ,
webhook_url: Optional[ str ] = None ,
webhook_secret: Optional[ str ] = None ,
webhook_type: Optional[ str ] = None ,
webhook_metadata: Optional[Dict[ str , Any]] = None
) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID interval_secondsintConditional Run every N seconds (min: 60) cron_expressionstrConditional Cron expression webhook_urlstrNo Webhook URL for notifications webhook_secretstrNo Webhook secret (auto-generated if not provided) webhook_typestrNo 'standard', 'slack', 'slack_workflow', or 'discord'webhook_metadataDictNo Custom JSON metadata for webhook payloads
Returns: Dict with message, created count, updated count
Example:
client.apply_group_schedule(
group_id = group[ "id" ],
interval_seconds = 3600 ,
webhook_url = "https://your-app.com/webhooks/meter"
)
Provide either interval_seconds or cron_expression, not both.
delete_group_schedules()
Delete all schedules for strategies in a group.
delete_group_schedules(group_id: str ) -> Dict
toggle_group_schedules()
Enable or disable all schedules in a group.
toggle_group_schedules(
group_id: str ,
enabled: bool
) -> Dict
Example:
# Pause all group schedules
client.toggle_group_schedules(group[ "id" ], enabled = False )
# Resume
client.toggle_group_schedules(group[ "id" ], enabled = True )
apply_group_schema()
Apply an output schema to all strategies in a group. Triggers async regeneration.
apply_group_schema(
group_id: str ,
output_schema: Dict[ str , Any]
) -> Dict
Parameters:
Parameter Type Required Description group_idstrYes Strategy group UUID output_schemaDictYes JSON schema defining output structure
Returns: Dict with message and strategy_count
Example:
client.apply_group_schema(
group_id = group[ "id" ],
output_schema = {
"title" : "string" ,
"price" : "number" ,
"in_stock" : "boolean"
}
)
get_schema_progress()
Poll progress of a group schema regeneration.
get_schema_progress(group_id: str ) -> Dict
test_group_webhook()
Send a test webhook using a group’s webhook configuration.
test_group_webhook(
group_id: str ,
webhook_url: str ,
webhook_secret: Optional[ str ] = None ,
webhook_type: Optional[ str ] = None ,
webhook_metadata: Optional[Dict[ str , Any]] = None
) -> Dict
Returns: Dict with success, status_code, message
Error Handling
All methods raise MeterError on API errors. See Error Handling for details.
from meter_sdk import MeterClient, MeterError
client = MeterClient( api_key = "sk_live_..." )
try :
strategy = client.generate_strategy(url, description, name)
except MeterError as e:
print ( f "Error: { e } " )
# Handle error appropriately
Next steps
Error Handling Learn about error handling and exceptions
Quick Start See the SDK in action with examples
Strategies Guide Deep dive into strategy concepts
Jobs Guide Understanding job lifecycle
Need help?
Email me at mckinnon@meter.sh