Skip to main content

RAG Integration Guide

Learn how to integrate Meter with your RAG (Retrieval-Augmented Generation) system to keep your vector database fresh without wasting embedding costs.

Overview

This guide shows you how to:
  • Set up Meter to monitor content sources
  • Detect meaningful changes automatically
  • Update only changed content in your vector database
  • Reduce embedding costs by up to 95%

Architecture

Prerequisites

  • Meter API key
  • Vector database (Pinecone, Weaviate, Qdrant, etc.)
  • Embedding service (OpenAI, Cohere, etc.)

Implementation

Step 1: Generate strategy for content source

from meter_sdk import MeterClient
import os

client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# TODO: Replace with your content source
strategy = client.generate_strategy(
    url="https://your-docs-site.com/page",
    description="Extract article title, content, and metadata",
    name="Documentation Monitor"
)

strategy_id = strategy["strategy_id"]
print(f"Strategy created: {strategy_id}")

Step 2: Set up monitoring schedule

# TODO: Adjust interval based on your needs
schedule = client.create_schedule(
    strategy_id=strategy_id,
    url="https://your-docs-site.com/page",
    interval_seconds=3600  # Check every hour
)

print(f"Schedule created: {schedule['id']}")

Step 3: Process changes and update vector DB

TODO: Add your vector DB integration code here Example structure (adapt for your vector database):
# Example with Pinecone
import pinecone
from openai import OpenAI

# TODO: Initialize your vector DB client
# pinecone.init(api_key=os.getenv("PINECONE_API_KEY"))
# index = pinecone.Index("your-index")

# TODO: Initialize your embedding service
# openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def process_changes():
    """Check for changes and update vector database"""
    changes = client.get_schedule_changes(
        schedule_id=schedule['id'],
        mark_seen=True
    )

    if changes['count'] == 0:
        print("No changes detected")
        return

    print(f"Processing {changes['count']} changed jobs")

    for change in changes['changes']:
        # TODO: Implement your update logic
        # 1. Delete old vectors for this URL
        # 2. Generate new embeddings for changed content
        # 3. Upsert new vectors

        url = change['url']
        results = change['results']

        print(f"Processing change for {url}: {len(results)} items")

        # Example: Delete old vectors
        # index.delete(filter={"url": url})

        # Example: Generate embeddings and upsert
        # for item in results:
        #     embedding = generate_embedding(item['content'])
        #     index.upsert([(item['id'], embedding, {"url": url, ...})])

# Run periodically
import time

while True:
    process_changes()
    time.sleep(3600)  # Check every hour

Best practices

1. Batch vector operations

TODO: Add batching logic for your vector DB
# Example: Batch upserts for better performance
def batch_upsert(vectors, batch_size=100):
    """Upsert vectors in batches"""
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        # TODO: Implement batch upsert for your vector DB
        # index.upsert(batch)
        pass

2. Handle embedding failures gracefully

TODO: Add error handling for your embedding service
def generate_embedding_with_retry(text, max_retries=3):
    """Generate embedding with retry logic"""
    for attempt in range(max_retries):
        try:
            # TODO: Call your embedding service
            # response = openai_client.embeddings.create(
            #     model="text-embedding-3-small",
            #     input=text
            # )
            # return response.data[0].embedding
            pass
        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

3. Track which URLs are indexed

TODO: Implement URL tracking for your use case
# Example: Store metadata to track indexed content
def track_indexed_content(url, content_hash, vector_ids):
    """Track which content has been indexed"""
    # TODO: Implement tracking (database, file, etc.)
    pass

Example integrations

Pinecone

TODO: Add Pinecone-specific code
# import pinecone
# pinecone.init(api_key=os.getenv("PINECONE_API_KEY"))
# index = pinecone.Index("docs")

# Delete and upsert
# index.delete(filter={"source": url})
# index.upsert(vectors)

Weaviate

TODO: Add Weaviate-specific code
# import weaviate
# client = weaviate.Client(url=os.getenv("WEAVIATE_URL"))

# Delete and create
# client.batch.delete_objects(...)
# client.batch.add_data_object(...)

Qdrant

TODO: Add Qdrant-specific code
# from qdrant_client import QdrantClient
# qdrant = QdrantClient(url=os.getenv("QDRANT_URL"))

# Delete and upsert
# qdrant.delete(collection_name="docs", points_selector=...)
# qdrant.upsert(collection_name="docs", points=...)

Monitoring and logging

TODO: Add monitoring for your setup
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def process_changes_with_logging():
    """Process changes with detailed logging"""
    changes = client.get_schedule_changes(schedule_id, mark_seen=True)

    logger.info(f"Checked schedule: {changes['count']} changes")

    for change in changes['changes']:
        logger.info(f"Processing {change['url']}: {change['item_count']} items")
        # Process...
        logger.info(f"Completed {change['url']}")

Cost optimization

Meter helps you reduce costs by:
  1. Avoiding re-embeddings: Only embed changed content
  2. Efficient change detection: Content hashing catches changes instantly
  3. Batching updates: Process multiple changes together
Before Meter:
  • Scrape daily: 30 scrapes/month
  • No change detection: Embed all content every time
  • Cost: 30 × $X = High embedding costs
After Meter:
  • Scrape hourly: 720 scrapes/month
  • Change detection: Embed only changes
  • Cost: ~5% of old cost (only when content changes)

Next steps

Need help?

Email me at [email protected]