MeterClient
The MeterClient class is the main interface for all Meter API operations. It handles authentication, request management, and provides methods for strategies, jobs, and schedules.
Constructor
MeterClient(api_key: str, base_url: str = "https://api.meter.sh")
Parameters
| Parameter | Type | Required | Description |
|---|
api_key | str | Yes | Your Meter API key (starts with sk_live_) |
base_url | str | No | API base URL (default: https://api.meter.sh) |
Example
from meter_sdk import MeterClient
import os
# Recommended: Load from environment
client = MeterClient(api_key=os.getenv("METER_API_KEY"))
# With custom base URL (for development)
client = MeterClient(
api_key=os.getenv("METER_API_KEY"),
base_url="http://localhost:8000"
)
Context Manager
The client can be used as a context manager for automatic resource cleanup:
with MeterClient(api_key="sk_live_...") as client:
strategies = client.list_strategies()
# Client automatically closes HTTP connections on exit
Strategy Methods
generate_strategy()
Generate a new extraction strategy using AI.
generate_strategy(
url: str,
description: str,
name: str
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
url | str | Yes | Target webpage URL to analyze |
description | str | Yes | Plain English description of what to extract |
name | str | Yes | Human-readable name for this strategy |
Returns: Dict with fields:
strategy_id (str): UUID of the created strategy
strategy (dict): The extraction strategy (CSS selectors, fields)
preview_data (list): Sample extracted data (first 5-10 items)
attempts (int): Number of generation attempts (usually 1)
Example:
result = client.generate_strategy(
url="https://news.ycombinator.com",
description="Extract post titles and scores",
name="HN Front Page"
)
strategy_id = result["strategy_id"]
print(f"Created strategy: {strategy_id}")
print(f"Preview: {result['preview_data'][:3]}")
Raises: MeterError if generation fails
refine_strategy()
Refine an existing strategy with feedback.
refine_strategy(
strategy_id: str,
feedback: str
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to refine |
feedback | str | Yes | Description of what to improve or add |
Returns: Dict with same fields as generate_strategy()
Example:
refined = client.refine_strategy(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
feedback="Also extract the product images and SKU"
)
print(f"Refined preview: {refined['preview_data']}")
Refinement uses cached HTML from initial generation, so it’s fast and doesn’t re-fetch the page.
list_strategies()
List all strategies for the authenticated user.
list_strategies(
limit: int = 20,
offset: int = 0
) -> List[Dict]
Parameters:
| Parameter | Type | Required | Description |
|---|
limit | int | No | Maximum number of strategies to return (default: 20) |
offset | int | No | Number of strategies to skip (default: 0) |
Returns: List[Dict] where each dict contains:
id (str): Strategy UUID
name (str): Strategy name
description (str): Extraction description
url (str): Original URL used for generation
preview_data (list): Sample extracted data
created_at (str): ISO timestamp
updated_at (str): ISO timestamp
Example:
# Get first 20 strategies
strategies = client.list_strategies()
for strategy in strategies:
print(f"{strategy['name']}: {strategy['id']}")
# Pagination
page_2 = client.list_strategies(limit=20, offset=20)
get_strategy()
Get details for a specific strategy.
get_strategy(strategy_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy |
Returns: Dict with full strategy details (same fields as list_strategies() items)
Example:
strategy = client.get_strategy("550e8400-e29b-41d4-a716-446655440000")
print(f"Name: {strategy['name']}")
print(f"Created: {strategy['created_at']}")
print(f"Preview: {strategy['preview_data']}")
Raises: MeterError with 404 if strategy not found
delete_strategy()
Delete a strategy and all associated jobs and schedules.
delete_strategy(strategy_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to delete |
Returns: Dict with confirmation message
Example:
result = client.delete_strategy("550e8400-e29b-41d4-a716-446655440000")
print(result) # {'message': 'Strategy deleted successfully'}
This action is irreversible. All associated jobs and schedules will also be deleted.
Job Methods
create_job()
Create a new scrape job using a strategy.
create_job(
strategy_id: str,
url: str
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | UUID of the strategy to use |
url | str | Yes | URL to scrape |
Returns: Dict with fields:
job_id (str): UUID of the created job
status (str): Job status (usually “pending”)
strategy_id (str): Strategy UUID
url (str): Target URL
created_at (str): ISO timestamp
Example:
job = client.create_job(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/page"
)
print(f"Job created: {job['job_id']}")
print(f"Status: {job['status']}")
get_job()
Get status and results for a job.
get_job(job_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
job_id | str | Yes | UUID of the job |
Returns: Dict with fields:
job_id (str): Job UUID
status (str): “pending”, “running”, “completed”, or “failed”
results (list): Extracted data (only if status is “completed”)
item_count (int): Number of items extracted
content_hash (str): Hash for change detection
structural_signature (dict): Structural fingerprint
error (str): Error message (only if status is “failed”)
started_at (str): ISO timestamp
completed_at (str): ISO timestamp
created_at (str): ISO timestamp
Example:
job = client.get_job("660e8400-e29b-41d4-a716-446655440000")
if job['status'] == 'completed':
print(f"Extracted {job['item_count']} items")
for item in job['results']:
print(item)
elif job['status'] == 'failed':
print(f"Job failed: {job['error']}")
else:
print(f"Job is {job['status']}")
wait_for_job()
Wait for a job to complete, polling automatically.
wait_for_job(
job_id: str,
poll_interval: float = 1.0,
timeout: Optional[float] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
job_id | str | Yes | UUID of the job to wait for |
poll_interval | float | No | Seconds between status checks (default: 1.0) |
timeout | float | No | Maximum seconds to wait (default: None = infinite) |
Returns: Dict with completed job details (same as get_job())
Example:
from meter_sdk import MeterError
# Wait indefinitely
completed = client.wait_for_job("660e8400-e29b-41d4-a716-446655440000")
print(f"Done! {completed['item_count']} items")
# With timeout
try:
completed = client.wait_for_job(
"660e8400-e29b-41d4-a716-446655440000",
poll_interval=2.0,
timeout=300.0 # 5 minutes
)
except MeterError as e:
print(f"Timeout or error: {e}")
Raises: MeterError if timeout exceeded or job fails
list_jobs()
List jobs with optional filtering.
list_jobs(
strategy_id: Optional[str] = None,
status: Optional[str] = None,
limit: int = 20,
offset: int = 0
) -> List[Dict]
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | No | Filter by strategy UUID |
status | str | No | Filter by status: “pending”, “running”, “completed”, “failed” |
limit | int | No | Maximum jobs to return (default: 20) |
offset | int | No | Number of jobs to skip (default: 0) |
Returns: List[Dict] of job summaries
Example:
# All jobs
all_jobs = client.list_jobs(limit=50)
# Jobs for specific strategy
strategy_jobs = client.list_jobs(
strategy_id="550e8400-e29b-41d4-a716-446655440000"
)
# Only failed jobs
failed = client.list_jobs(status="failed", limit=10)
# Combined filters
recent_completed = client.list_jobs(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
status="completed",
limit=5
)
compare_jobs()
Compare two jobs to detect changes.
compare_jobs(
job_id: str,
other_job_id: str
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
job_id | str | Yes | First job UUID |
other_job_id | str | Yes | Second job UUID to compare with |
Returns: Dict with fields:
content_hash_match (bool): True if content hashes match
structural_match (bool): True if structure matches
semantic_similarity (float): Similarity score 0.0-1.0 (planned feature)
changes (list): Detected structural changes
Example:
comparison = client.compare_jobs(
"660e8400-e29b-41d4-a716-446655440000",
"770e8400-e29b-41d4-a716-446655440000"
)
print(f"Content match: {comparison['content_hash_match']}")
print(f"Structural match: {comparison['structural_match']}")
if not comparison['content_hash_match']:
print("Content has changed!")
for change in comparison.get('changes', []):
print(f" - {change}")
get_strategy_history()
Get timeline of all jobs for a strategy.
get_strategy_history(strategy_id: str) -> List[Dict]
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | Strategy UUID |
Returns: List[Dict] where each dict contains:
job_id (str): Job UUID
status (str): Job status
item_count (int): Items extracted
has_changes (bool): True if content changed vs. previous job
created_at (str): ISO timestamp
Example:
history = client.get_strategy_history("550e8400-e29b-41d4-a716-446655440000")
for entry in history:
status_icon = "✓" if entry['status'] == 'completed' else "✗"
change_icon = "📝" if entry['has_changes'] else "—"
print(f"{status_icon} {entry['created_at']}: {entry['item_count']} items {change_icon}")
Schedule Methods
create_schedule()
Create a new recurring schedule.
create_schedule(
strategy_id: str,
url: str,
interval_seconds: Optional[int] = None,
cron_expression: Optional[str] = None,
webhook_url: Optional[str] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
strategy_id | str | Yes | Strategy UUID to use |
url | str | Yes | URL to scrape |
interval_seconds | int | Conditional | Interval in seconds (required if no cron) |
cron_expression | str | Conditional | Cron expression (required if no interval) |
webhook_url | str | No | Webhook URL for notifications |
Returns: Dict with schedule details
Example:
# Interval-based
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products",
interval_seconds=3600 # Every hour
)
# Cron-based
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products",
cron_expression="0 9 * * *" # Daily at 9 AM
)
# With webhook
schedule = client.create_schedule(
strategy_id="550e8400-e29b-41d4-a716-446655440000",
url="https://example.com/products",
interval_seconds=3600,
webhook_url="https://your-app.com/webhooks/meter"
)
print(f"Schedule created: {schedule['id']}")
print(f"Next run: {schedule['next_run_at']}")
You must provide either interval_seconds or cron_expression, but not both.
list_schedules()
List all schedules for the authenticated user.
list_schedules() -> List[Dict]
Returns: List[Dict] of schedules
Example:
schedules = client.list_schedules()
for schedule in schedules:
print(f"Schedule {schedule['id']}:")
print(f" Type: {schedule['schedule_type']}")
print(f" Enabled: {schedule['enabled']}")
print(f" Next run: {schedule['next_run_at']}")
update_schedule()
Update an existing schedule.
update_schedule(
schedule_id: str,
enabled: Optional[bool] = None,
interval_seconds: Optional[int] = None,
cron_expression: Optional[str] = None,
webhook_url: Optional[str] = None
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID |
enabled | bool | No | Enable/disable schedule |
interval_seconds | int | No | New interval in seconds |
cron_expression | str | No | New cron expression |
webhook_url | str | No | New webhook URL (or None to remove) |
Returns: Dict with updated schedule details
Example:
# Disable schedule
client.update_schedule(schedule_id, enabled=False)
# Change interval
client.update_schedule(schedule_id, interval_seconds=7200)
# Update webhook
client.update_schedule(
schedule_id,
webhook_url="https://new-domain.com/webhooks"
)
# Remove webhook
client.update_schedule(schedule_id, webhook_url=None)
delete_schedule()
Delete a schedule (stops future jobs).
delete_schedule(schedule_id: str) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID to delete |
Returns: Dict with confirmation message
Example:
result = client.delete_schedule("880e8400-e29b-41d4-a716-446655440000")
print(result) # {'message': 'Schedule deleted successfully'}
get_schedule_changes()
Get unseen changes for a schedule (pull-based change detection).
get_schedule_changes(
schedule_id: str,
mark_seen: bool = True
) -> Dict
Parameters:
| Parameter | Type | Required | Description |
|---|
schedule_id | str | Yes | Schedule UUID |
mark_seen | bool | No | Mark returned changes as seen (default: True) |
Returns: Dict with fields:
schedule_id (str): Schedule UUID
changes (list): Jobs with changes (full job details)
count (int): Number of changed jobs
marked_seen (bool): Whether changes were marked as seen
Example:
# Get and mark changes as seen
changes = client.get_schedule_changes(
"880e8400-e29b-41d4-a716-446655440000",
mark_seen=True
)
if changes['count'] > 0:
print(f"Found {changes['count']} jobs with changes")
for change in changes['changes']:
print(f"Job {change['job_id']}: {change['item_count']} items")
# Process change['results']
# Preview without marking as seen
preview = client.get_schedule_changes(
"880e8400-e29b-41d4-a716-446655440000",
mark_seen=False
)
Use mark_seen=False to preview changes without affecting state.
Error Handling
All methods raise MeterError on API errors. See Error Handling for details.
from meter_sdk import MeterClient, MeterError
client = MeterClient(api_key="sk_live_...")
try:
strategy = client.generate_strategy(url, description, name)
except MeterError as e:
print(f"Error: {e}")
# Handle error appropriately
Next steps
Need help?
Email me at [email protected]