Skip to main content

MeterClient

The MeterClient class is the main interface for all Meter API operations. It handles authentication, request management, and provides methods for strategies, jobs, and schedules.

Constructor

MeterClient(api_key: str, base_url: str = "https://api.meter.sh")

Parameters

ParameterTypeRequiredDescription
api_keystrYesYour Meter API key (starts with sk_live_)
base_urlstrNoAPI base URL (default: https://api.meter.sh)

Example

from meter_sdk import MeterClient
import os

# Recommended: Load from environment
client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# With custom base URL (for development)
client = MeterClient(
    api_key=os.getenv("METER_API_KEY"),
    base_url="http://localhost:8000"
)

Context Manager

The client can be used as a context manager for automatic resource cleanup:
with MeterClient(api_key="sk_live_...") as client:
    strategies = client.list_strategies()
    # Client automatically closes HTTP connections on exit

Strategy Methods

generate_strategy()

Generate a new extraction strategy using AI.
generate_strategy(
    url: str,
    description: str,
    name: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
urlstrYesTarget webpage URL to analyze
descriptionstrYesPlain English description of what to extract
namestrYesHuman-readable name for this strategy
Returns: Dict with fields:
  • strategy_id (str): UUID of the created strategy
  • strategy (dict): The extraction strategy (CSS selectors, fields)
  • preview_data (list): Sample extracted data (first 5-10 items)
  • attempts (int): Number of generation attempts (usually 1)
Example:
result = client.generate_strategy(
    url="https://news.ycombinator.com",
    description="Extract post titles and scores",
    name="HN Front Page"
)

strategy_id = result["strategy_id"]
print(f"Created strategy: {strategy_id}")
print(f"Preview: {result['preview_data'][:3]}")
Raises: MeterError if generation fails

refine_strategy()

Refine an existing strategy with feedback.
refine_strategy(
    strategy_id: str,
    feedback: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to refine
feedbackstrYesDescription of what to improve or add
Returns: Dict with same fields as generate_strategy() Example:
refined = client.refine_strategy(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    feedback="Also extract the product images and SKU"
)

print(f"Refined preview: {refined['preview_data']}")
Refinement uses cached HTML from initial generation, so it’s fast and doesn’t re-fetch the page.

list_strategies()

List all strategies for the authenticated user.
list_strategies(
    limit: int = 20,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
limitintNoMaximum number of strategies to return (default: 20)
offsetintNoNumber of strategies to skip (default: 0)
Returns: List[Dict] where each dict contains:
  • id (str): Strategy UUID
  • name (str): Strategy name
  • description (str): Extraction description
  • url (str): Original URL used for generation
  • preview_data (list): Sample extracted data
  • created_at (str): ISO timestamp
  • updated_at (str): ISO timestamp
Example:
# Get first 20 strategies
strategies = client.list_strategies()

for strategy in strategies:
    print(f"{strategy['name']}: {strategy['id']}")

# Pagination
page_2 = client.list_strategies(limit=20, offset=20)

get_strategy()

Get details for a specific strategy.
get_strategy(strategy_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy
Returns: Dict with full strategy details (same fields as list_strategies() items) Example:
strategy = client.get_strategy("550e8400-e29b-41d4-a716-446655440000")

print(f"Name: {strategy['name']}")
print(f"Created: {strategy['created_at']}")
print(f"Preview: {strategy['preview_data']}")
Raises: MeterError with 404 if strategy not found

delete_strategy()

Delete a strategy and all associated jobs and schedules.
delete_strategy(strategy_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to delete
Returns: Dict with confirmation message Example:
result = client.delete_strategy("550e8400-e29b-41d4-a716-446655440000")
print(result)  # {'message': 'Strategy deleted successfully'}
This action is irreversible. All associated jobs and schedules will also be deleted.

Job Methods

create_job()

Create a new scrape job using a strategy.
create_job(
    strategy_id: str,
    url: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesUUID of the strategy to use
urlstrYesURL to scrape
Returns: Dict with fields:
  • job_id (str): UUID of the created job
  • status (str): Job status (usually “pending”)
  • strategy_id (str): Strategy UUID
  • url (str): Target URL
  • created_at (str): ISO timestamp
Example:
job = client.create_job(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/page"
)

print(f"Job created: {job['job_id']}")
print(f"Status: {job['status']}")

get_job()

Get status and results for a job.
get_job(job_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesUUID of the job
Returns: Dict with fields:
  • job_id (str): Job UUID
  • status (str): “pending”, “running”, “completed”, or “failed”
  • results (list): Extracted data (only if status is “completed”)
  • item_count (int): Number of items extracted
  • content_hash (str): Hash for change detection
  • structural_signature (dict): Structural fingerprint
  • error (str): Error message (only if status is “failed”)
  • started_at (str): ISO timestamp
  • completed_at (str): ISO timestamp
  • created_at (str): ISO timestamp
Example:
job = client.get_job("660e8400-e29b-41d4-a716-446655440000")

if job['status'] == 'completed':
    print(f"Extracted {job['item_count']} items")
    for item in job['results']:
        print(item)
elif job['status'] == 'failed':
    print(f"Job failed: {job['error']}")
else:
    print(f"Job is {job['status']}")

wait_for_job()

Wait for a job to complete, polling automatically.
wait_for_job(
    job_id: str,
    poll_interval: float = 1.0,
    timeout: Optional[float] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesUUID of the job to wait for
poll_intervalfloatNoSeconds between status checks (default: 1.0)
timeoutfloatNoMaximum seconds to wait (default: None = infinite)
Returns: Dict with completed job details (same as get_job()) Example:
from meter_sdk import MeterError

# Wait indefinitely
completed = client.wait_for_job("660e8400-e29b-41d4-a716-446655440000")
print(f"Done! {completed['item_count']} items")

# With timeout
try:
    completed = client.wait_for_job(
        "660e8400-e29b-41d4-a716-446655440000",
        poll_interval=2.0,
        timeout=300.0  # 5 minutes
    )
except MeterError as e:
    print(f"Timeout or error: {e}")
Raises: MeterError if timeout exceeded or job fails

list_jobs()

List jobs with optional filtering.
list_jobs(
    strategy_id: Optional[str] = None,
    status: Optional[str] = None,
    limit: int = 20,
    offset: int = 0
) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
strategy_idstrNoFilter by strategy UUID
statusstrNoFilter by status: “pending”, “running”, “completed”, “failed”
limitintNoMaximum jobs to return (default: 20)
offsetintNoNumber of jobs to skip (default: 0)
Returns: List[Dict] of job summaries Example:
# All jobs
all_jobs = client.list_jobs(limit=50)

# Jobs for specific strategy
strategy_jobs = client.list_jobs(
    strategy_id="550e8400-e29b-41d4-a716-446655440000"
)

# Only failed jobs
failed = client.list_jobs(status="failed", limit=10)

# Combined filters
recent_completed = client.list_jobs(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    status="completed",
    limit=5
)

compare_jobs()

Compare two jobs to detect changes.
compare_jobs(
    job_id: str,
    other_job_id: str
) -> Dict
Parameters:
ParameterTypeRequiredDescription
job_idstrYesFirst job UUID
other_job_idstrYesSecond job UUID to compare with
Returns: Dict with fields:
  • content_hash_match (bool): True if content hashes match
  • structural_match (bool): True if structure matches
  • semantic_similarity (float): Similarity score 0.0-1.0 (planned feature)
  • changes (list): Detected structural changes
Example:
comparison = client.compare_jobs(
    "660e8400-e29b-41d4-a716-446655440000",
    "770e8400-e29b-41d4-a716-446655440000"
)

print(f"Content match: {comparison['content_hash_match']}")
print(f"Structural match: {comparison['structural_match']}")

if not comparison['content_hash_match']:
    print("Content has changed!")
    for change in comparison.get('changes', []):
        print(f"  - {change}")

get_strategy_history()

Get timeline of all jobs for a strategy.
get_strategy_history(strategy_id: str) -> List[Dict]
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesStrategy UUID
Returns: List[Dict] where each dict contains:
  • job_id (str): Job UUID
  • status (str): Job status
  • item_count (int): Items extracted
  • has_changes (bool): True if content changed vs. previous job
  • created_at (str): ISO timestamp
Example:
history = client.get_strategy_history("550e8400-e29b-41d4-a716-446655440000")

for entry in history:
    status_icon = "✓" if entry['status'] == 'completed' else "✗"
    change_icon = "📝" if entry['has_changes'] else "—"
    print(f"{status_icon} {entry['created_at']}: {entry['item_count']} items {change_icon}")

Schedule Methods

create_schedule()

Create a new recurring schedule.
create_schedule(
    strategy_id: str,
    url: str,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
strategy_idstrYesStrategy UUID to use
urlstrYesURL to scrape
interval_secondsintConditionalInterval in seconds (required if no cron)
cron_expressionstrConditionalCron expression (required if no interval)
webhook_urlstrNoWebhook URL for notifications
Returns: Dict with schedule details Example:
# Interval-based
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    interval_seconds=3600  # Every hour
)

# Cron-based
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    cron_expression="0 9 * * *"  # Daily at 9 AM
)

# With webhook
schedule = client.create_schedule(
    strategy_id="550e8400-e29b-41d4-a716-446655440000",
    url="https://example.com/products",
    interval_seconds=3600,
    webhook_url="https://your-app.com/webhooks/meter"
)

print(f"Schedule created: {schedule['id']}")
print(f"Next run: {schedule['next_run_at']}")
You must provide either interval_seconds or cron_expression, but not both.

list_schedules()

List all schedules for the authenticated user.
list_schedules() -> List[Dict]
Returns: List[Dict] of schedules Example:
schedules = client.list_schedules()

for schedule in schedules:
    print(f"Schedule {schedule['id']}:")
    print(f"  Type: {schedule['schedule_type']}")
    print(f"  Enabled: {schedule['enabled']}")
    print(f"  Next run: {schedule['next_run_at']}")

update_schedule()

Update an existing schedule.
update_schedule(
    schedule_id: str,
    enabled: Optional[bool] = None,
    interval_seconds: Optional[int] = None,
    cron_expression: Optional[str] = None,
    webhook_url: Optional[str] = None
) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
enabledboolNoEnable/disable schedule
interval_secondsintNoNew interval in seconds
cron_expressionstrNoNew cron expression
webhook_urlstrNoNew webhook URL (or None to remove)
Returns: Dict with updated schedule details Example:
# Disable schedule
client.update_schedule(schedule_id, enabled=False)

# Change interval
client.update_schedule(schedule_id, interval_seconds=7200)

# Update webhook
client.update_schedule(
    schedule_id,
    webhook_url="https://new-domain.com/webhooks"
)

# Remove webhook
client.update_schedule(schedule_id, webhook_url=None)

delete_schedule()

Delete a schedule (stops future jobs).
delete_schedule(schedule_id: str) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID to delete
Returns: Dict with confirmation message Example:
result = client.delete_schedule("880e8400-e29b-41d4-a716-446655440000")
print(result)  # {'message': 'Schedule deleted successfully'}

get_schedule_changes()

Get unseen changes for a schedule (pull-based change detection).
get_schedule_changes(
    schedule_id: str,
    mark_seen: bool = True
) -> Dict
Parameters:
ParameterTypeRequiredDescription
schedule_idstrYesSchedule UUID
mark_seenboolNoMark returned changes as seen (default: True)
Returns: Dict with fields:
  • schedule_id (str): Schedule UUID
  • changes (list): Jobs with changes (full job details)
  • count (int): Number of changed jobs
  • marked_seen (bool): Whether changes were marked as seen
Example:
# Get and mark changes as seen
changes = client.get_schedule_changes(
    "880e8400-e29b-41d4-a716-446655440000",
    mark_seen=True
)

if changes['count'] > 0:
    print(f"Found {changes['count']} jobs with changes")
    for change in changes['changes']:
        print(f"Job {change['job_id']}: {change['item_count']} items")
        # Process change['results']

# Preview without marking as seen
preview = client.get_schedule_changes(
    "880e8400-e29b-41d4-a716-446655440000",
    mark_seen=False
)
Use mark_seen=False to preview changes without affecting state.

Error Handling

All methods raise MeterError on API errors. See Error Handling for details.
from meter_sdk import MeterClient, MeterError

client = MeterClient(api_key="sk_live_...")

try:
    strategy = client.generate_strategy(url, description, name)
except MeterError as e:
    print(f"Error: {e}")
    # Handle error appropriately

Next steps

Need help?

Email me at [email protected]