Skip to main content

News Aggregation

Build a news aggregation system that monitors multiple sources and detects new articles.

Use case

Aggregate news from multiple sources, detect new articles, and feed them into your knowledge base or RAG system.

What you’ll build

  • Strategies for multiple news sources
  • Hourly monitoring of all sources
  • New article detection
  • RAG/vector database integration

Prerequisites

  • Meter API key
  • TODO: Vector database (Pinecone, Weaviate, etc.)
  • TODO: Embedding service (OpenAI, Cohere, etc.)

Step 1: Generate strategies for news sources

TODO: Add your news sources
from meter_sdk import MeterClient
import os

client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# Define news sources
NEWS_SOURCES = [
    {
        "name": "Tech News Site",
        "url": "https://technews.example.com",  # TODO: Replace
        "description": "Extract article title, author, publish date, and excerpt"
    },
    {
        "name": "Industry Blog",
        "url": "https://blog.example.com",  # TODO: Replace
        "description": "Extract post title, author, date, and summary"
    },
    # TODO: Add more sources
]

# Generate strategies
strategies = []
for source in NEWS_SOURCES:
    strategy = client.generate_strategy(
        url=source['url'],
        description=source['description'],
        name=source['name']
    )
    strategies.append({
        "source": source['name'],
        "strategy_id": strategy['strategy_id'],
        "url": source['url']
    })
    print(f"Created strategy for {source['name']}: {strategy['strategy_id']}")

# TODO: Save strategy IDs to config file

Step 2: Set up monitoring for all sources

# Create schedules for each source
schedules = []

for strategy in strategies:
    schedule = client.create_schedule(
        strategy_id=strategy['strategy_id'],
        url=strategy['url'],
        interval_seconds=3600  # Check hourly
    )
    schedules.append({
        "source": strategy['source'],
        "schedule_id": schedule['id']
    })
    print(f"Monitoring {strategy['source']}: {schedule['id']}")

# TODO: Save schedule IDs to config file

Step 3: Aggregate new articles

def aggregate_news(schedules):
    """Collect new articles from all sources"""
    all_articles = []

    for schedule_info in schedules:
        changes = client.get_schedule_changes(
            schedule_info['schedule_id'],
            mark_seen=True
        )

        if changes['count'] > 0:
            for change in changes['changes']:
                for article in change['results']:
                    # Add source metadata
                    article['source'] = schedule_info['source']
                    article['scraped_at'] = change['completed_at']
                    all_articles.append(article)

    return all_articles

# Check all sources
new_articles = aggregate_news(schedules)
print(f"Found {len(new_articles)} new articles")

Step 4: Keyword filtering for specific topics

Use keyword filters to only retrieve articles matching specific topics. This is useful when monitoring news for specific keywords relevant to your business.

Filter syntax

SyntaxMeaningExample
+keywordRequired (AND)+jfk +tariff - must have both
keywordOptional (OR)jfk elon - either matches
-keywordExcluded (NOT)-bitcoin - exclude these
"phrase"Exact phrase"elon musk" - exact match

Example: Monitor news for specific keywords

def get_filtered_news(schedule_id, keywords):
    """Get only articles matching specific keywords"""
    changes = client.get_schedule_changes(
        schedule_id=schedule_id,
        filter=keywords,
        mark_seen=True
    )

    articles = []
    for change in changes['changes']:
        for article in change['results']:
            articles.append(article)

    return articles

# Get articles about JFK AND tariffs
tariff_news = get_filtered_news(schedule_id, "+jfk +tariff")
print(f"Found {len(tariff_news)} articles about jfk and tariffs")

# Get articles mentioning either Tesla OR SpaceX
tech_news = get_filtered_news(schedule_id, "tesla spacex")
print(f"Found {len(tech_news)} articles about Tesla or SpaceX")

# Get crypto news but exclude Bitcoin
altcoin_news = get_filtered_news(schedule_id, "+crypto -bitcoin")
print(f"Found {len(altcoin_news)} altcoin articles")

Complete example: Multi-topic news monitor

from meter_sdk import MeterClient
import os

client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# Define topics to monitor with their keyword filters
TOPICS = {
    "tariffs": "+jfk +tariff",
    "tech_earnings": "+earnings tesla apple nvidia",
    "crypto_regulation": "+crypto +regulation -bitcoin",
    "ai_news": '+ai +"artificial intelligence" +openai anthropic',
}

def monitor_topics(schedule_id):
    """Monitor multiple topics from a single news source"""
    results = {}

    for topic_name, filter_query in TOPICS.items():
        # Get articles matching this topic
        # Note: Use mark_seen=False to allow the same articles
        # to match multiple topics
        changes = client.get_schedule_changes(
            schedule_id=schedule_id,
            filter=filter_query,
            mark_seen=False  # Don't mark as seen yet
        )

        articles = []
        for change in changes['changes']:
            articles.extend(change['results'])

        results[topic_name] = articles
        print(f"{topic_name}: {len(articles)} articles")

    # Now mark all as seen
    client.get_schedule_changes(schedule_id=schedule_id, mark_seen=True)

    return results

# Monitor all topics
topic_results = monitor_topics(schedule_id)

# Process results by topic
for topic, articles in topic_results.items():
    if articles:
        print(f"\n=== {topic.upper()} ===")
        for article in articles[:3]:  # Show top 3
            print(f"  - {article.get('title', 'Untitled')}")
The filter applies to individual result items. If a job returns 50 articles but only 5 match your filter, you’ll only receive those 5 matching articles. Jobs with zero matching items are excluded entirely.

Step 5: Feed into RAG system

TODO: Integrate with your vector database
def update_vector_db(articles):
    """Add new articles to vector database"""
    for article in articles:
        # TODO: Generate embedding
        # embedding = openai.embeddings.create(
        #     model="text-embedding-3-small",
        #     input=f"{article['title']} {article.get('excerpt', '')}"
        # ).data[0].embedding

        # TODO: Upsert to vector database
        # pinecone_index.upsert([
        #     (
        #         article['url'],  # ID
        #         embedding,
        #         {
        #             "title": article['title'],
        #             "source": article['source'],
        #             "date": article.get('publish_date'),
        #             "url": article['url']
        #         }
        #     )
        # ])

        pass

# Process new articles
update_vector_db(new_articles)

Complete example

TODO: Full implementation
# news_aggregator.py

from meter_sdk import MeterClient
import os
import time
import json
from datetime import datetime

client = MeterClient(api_key=os.getenv("METER_API_KEY"))

# Load configuration
CONFIG_FILE = "news_config.json"

def load_config():
    """Load news sources and schedule IDs"""
    try:
        with open(CONFIG_FILE, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        return {"sources": []}

def save_config(config):
    """Save configuration"""
    with open(CONFIG_FILE, 'w') as f:
        json.dump(config, f, indent=2)

def setup_monitoring():
    """Setup strategies and schedules for all sources"""
    config = load_config()

    # TODO: Define your news sources
    NEWS_SOURCES = [
        # Add your sources here
    ]

    for source in NEWS_SOURCES:
        # Generate strategy
        strategy = client.generate_strategy(
            url=source['url'],
            description=source['description'],
            name=source['name']
        )

        # Create schedule
        schedule = client.create_schedule(
            strategy_id=strategy['strategy_id'],
            url=source['url'],
            interval_seconds=3600
        )

        config['sources'].append({
            "name": source['name'],
            "url": source['url'],
            "strategy_id": strategy['strategy_id'],
            "schedule_id": schedule['id']
        })

    save_config(config)
    return config

def monitor_all_sources():
    """Monitor all news sources and aggregate articles"""
    config = load_config()

    all_new_articles = []

    for source in config['sources']:
        print(f"Checking {source['name']}...")

        changes = client.get_schedule_changes(
            source['schedule_id'],
            mark_seen=True
        )

        if changes['count'] > 0:
            for change in changes['changes']:
                for article in change['results']:
                    article['source'] = source['name']
                    article['source_url'] = source['url']
                    all_new_articles.append(article)

            print(f"  Found {len(changes['changes'])} new articles")
        else:
            print(f"  No new articles")

    return all_new_articles

def main():
    """Main aggregation loop"""
    # Setup (run once)
    # config = setup_monitoring()

    # Load existing config
    config = load_config()

    if len(config['sources']) == 0:
        print("No sources configured. Run setup_monitoring() first.")
        return

    print(f"Monitoring {len(config['sources'])} sources")

    while True:
        print(f"\n[{datetime.now()}] Checking all sources...")

        new_articles = monitor_all_sources()

        if len(new_articles) > 0:
            print(f"\nProcessing {len(new_articles)} new articles:")
            for article in new_articles:
                print(f"  - [{article['source']}] {article.get('title', 'Untitled')}")

            # TODO: Update vector database
            # update_vector_db(new_articles)

        else:
            print("No new articles across all sources")

        print("\nWaiting 1 hour...")
        time.sleep(3600)

if __name__ == "__main__":
    main()

Deployment

TODO: Deploy your aggregator
  • Run as background service
  • Use cron for periodic checks
  • Deploy to cloud (AWS, GCP, etc.)

Advanced features

TODO: Extend functionality
  • Article deduplication across sources
  • Trend detection
  • Topic clustering
  • Sentiment analysis integration

See also

Need help?

Email me at [email protected]