Skip to main content

Thread and file storage components for conversational AI - the companion to Tyler AI framework

Project description

The Narrator

Thread and file storage components for conversational AI - the companion to Tyler AI framework.

Overview

The Narrator provides robust, production-ready storage solutions for conversational AI applications. It includes:

  • ThreadStore: Persistent storage for conversation threads with support for both in-memory and SQL backends
  • FileStore: Secure file storage with automatic processing for various file types
  • Models: Pydantic models for threads, messages, and attachments
  • Registry: Component registry for managing shared storage instances

Features

ThreadStore

  • Multiple Backends: In-memory (development), SQLite (local), PostgreSQL (production)
  • Async/Await Support: Built for modern Python async applications
  • Message Filtering: Automatic handling of system vs. user messages
  • Platform Integration: Support for external platform references (Slack, etc.)
  • Connection Pooling: Production-ready database connection management

FileStore

  • Secure Storage: Automatic file validation and type checking
  • Multiple Formats: Support for documents, images, audio, and more
  • Content Processing: Automatic text extraction from PDFs, image analysis
  • Storage Limits: Configurable file size and total storage limits
  • Sharded Storage: Efficient file organization to prevent directory bloat

Installation

pip install the-narrator

Quick Start

Basic Thread Storage

import asyncio
from narrator import ThreadStore, Thread, Message

async def main():
    # Create an in-memory store for development
    store = await ThreadStore.create()
    
    # Create a thread
    thread = Thread(title="My Conversation")
    
    # Add messages
    thread.add_message(Message(role="user", content="Hello!"))
    thread.add_message(Message(role="assistant", content="Hi there!"))
    
    # Save the thread
    await store.save(thread)
    
    # Retrieve the thread
    retrieved = await store.get(thread.id)
    print(f"Thread: {retrieved.title}")
    print(f"Messages: {len(retrieved.messages)}")

asyncio.run(main())

File Storage

import asyncio
from narrator import FileStore

async def main():
    # Create a file store
    store = await FileStore.create()
    
    # Save a file
    content = b"Hello, world!"
    metadata = await store.save(content, "hello.txt", "text/plain")
    
    print(f"File ID: {metadata['id']}")
    print(f"Storage path: {metadata['storage_path']}")
    
    # Retrieve the file
    retrieved_content = await store.get(metadata['id'])
    print(f"Content: {retrieved_content.decode()}")

asyncio.run(main())

Database Storage

import asyncio
from narrator import ThreadStore

async def main():
    # Use SQLite for persistent storage
    store = await ThreadStore.create("sqlite+aiosqlite:///conversations.db")
    
    # Use PostgreSQL for production
    # store = await ThreadStore.create("postgresql+asyncpg://user:pass@localhost/dbname")
    
    # The API is the same regardless of backend
    thread = Thread(title="Persistent Conversation")
    await store.save(thread)

asyncio.run(main())

Configuration

Environment Variables

The Narrator can be configured using environment variables:

Database Configuration

# Database settings
NARRATOR_DB_POOL_SIZE=5              # Connection pool size
NARRATOR_DB_MAX_OVERFLOW=10          # Max additional connections
NARRATOR_DB_POOL_TIMEOUT=30          # Connection timeout (seconds)
NARRATOR_DB_POOL_RECYCLE=300         # Connection recycle time (seconds)
NARRATOR_DB_ECHO=false               # Enable SQL logging

# Logging
NARRATOR_LOG_LEVEL=INFO              # Log level

File Storage Configuration

# File storage settings
NARRATOR_FILE_STORAGE_PATH=/path/to/files  # Storage directory
NARRATOR_MAX_FILE_SIZE=52428800            # 50MB max file size
NARRATOR_MAX_STORAGE_SIZE=5368709120       # 5GB max total storage
NARRATOR_ALLOWED_MIME_TYPES=image/jpeg,application/pdf  # Allowed file types

Advanced Usage

Component Registry

The registry allows you to manage shared storage instances across your application:

import asyncio
from narrator import ThreadStore, FileStore
from narrator.utils.registry import register_thread_store, register_file_store, get_thread_store

async def main():
    # Create and register stores
    thread_store = await ThreadStore.create("sqlite+aiosqlite:///main.db")
    file_store = await FileStore.create("/path/to/files")
    
    register_thread_store("main", thread_store)
    register_file_store("main", file_store)
    
    # Retrieve stores from anywhere in your application
    store = get_thread_store("main")
    if store:
        threads = await store.list_recent(limit=10)
        print(f"Found {len(threads)} recent threads")

asyncio.run(main())

Message Attachments

Messages can include file attachments that are automatically processed:

import asyncio
from narrator import Thread, Message, Attachment, FileStore

async def main():
    file_store = await FileStore.create()
    
    # Create a message with an attachment
    message = Message(role="user", content="Here's a document")
    
    # Add file content
    pdf_content = b"..."  # Your PDF content
    attachment = Attachment(filename="document.pdf", content=pdf_content)
    message.add_attachment(attachment)
    
    # Process and store the attachment
    await attachment.process_and_store(file_store)
    
    # The attachment now has extracted text and metadata
    print(f"Status: {attachment.status}")
    print(f"File ID: {attachment.file_id}")
    if attachment.attributes:
        print(f"Extracted text: {attachment.attributes.get('text', 'N/A')[:100]}...")

asyncio.run(main())

Platform Integration

Threads can be linked to external platforms:

import asyncio
from narrator import Thread, ThreadStore

async def main():
    store = await ThreadStore.create()
    
    # Create a thread linked to Slack
    thread = Thread(
        title="Support Ticket #123",
        platforms={
            "slack": {
                "channel": "C1234567",
                "thread_ts": "1234567890.123"
            }
        }
    )
    
    await store.save(thread)
    
    # Find threads by platform
    slack_threads = await store.find_by_platform("slack", {"channel": "C1234567"})
    print(f"Found {len(slack_threads)} Slack threads in channel")

asyncio.run(main())

Database CLI

The Narrator includes a CLI tool for database management:

# Initialize database tables
narrator-db init --database-url "postgresql://user:pass@localhost/dbname"

# Check database status
narrator-db status --database-url "postgresql://user:pass@localhost/dbname"

Migration from Tyler

If you're migrating from the original Tyler package:

  1. Update imports:
    </code></pre>
    </li>
    </ol>
    <h1>Before</h1>
    <p>from tyler import ThreadStore, FileStore, Thread, Message</p>
    <h1>After</h1>
    <p>from narrator import ThreadStore, FileStore, Thread, Message</p>
    <pre><code>
    2. **Update environment variables**:
       ```bash
    # Before
    TYLER_DB_POOL_SIZE=5
    TYLER_FILE_STORAGE_PATH=/path/to/files
    
    # After
    NARRATOR_DB_POOL_SIZE=5
    NARRATOR_FILE_STORAGE_PATH=/path/to/files
    
    1. Database compatibility: The database schema is fully compatible, so existing data will work without changes.

    API Reference

    ThreadStore

    Methods

    • await ThreadStore.create(database_url=None): Factory method to create and initialize a store
    • await store.save(thread): Save a thread to storage
    • await store.get(thread_id): Retrieve a thread by ID
    • await store.delete(thread_id): Delete a thread
    • await store.list(limit=100, offset=0): List threads with pagination
    • await store.find_by_attributes(attributes): Find threads by custom attributes
    • await store.find_by_platform(platform_name, properties): Find threads by platform
    • await store.list_recent(limit=None): List recent threads

    FileStore

    Methods

    • await FileStore.create(base_path=None, ...): Factory method to create and validate a store
    • await store.save(content, filename, mime_type=None): Save file content
    • await store.get(file_id, storage_path=None): Retrieve file content
    • await store.delete(file_id, storage_path=None): Delete a file
    • await store.get_storage_size(): Get total storage size
    • await store.check_health(): Check storage health

    Models

    Thread

    • id: Unique thread identifier
    • title: Thread title
    • messages: List of messages
    • created_at: Creation timestamp
    • updated_at: Last update timestamp
    • attributes: Custom attributes dictionary
    • platforms: Platform-specific metadata

    Message

    • id: Unique message identifier
    • role: Message role (user, assistant, system, tool)
    • content: Message content
    • attachments: List of file attachments
    • timestamp: Message timestamp
    • metrics: Performance metrics

    Attachment

    • filename: Original filename
    • mime_type: File MIME type
    • file_id: Storage file ID
    • storage_path: Path in storage
    • status: Processing status (pending, stored, failed)
    • attributes: Processed content and metadata

    Development

    Running Tests

    To run the test suite locally:

    # Install development dependencies
    uv sync --extra dev
    
    # Run tests with coverage
    uv run pytest tests/ --cov=narrator --cov-report=term-missing --cov-branch --cov-report=term --no-cov-on-fail -v
    
    # Run tests without coverage (faster)
    uv run pytest tests/ -v
    

    Test Requirements

    The test suite requires:

    • Python 3.12+
    • pytest with async support
    • Test coverage reporting
    • System dependencies (libmagic for file type detection)

    Contributing

    1. Fork the repository
    2. Create a feature branch
    3. Make your changes
    4. Add tests
    5. Run the test suite to ensure everything works
    6. Submit a pull request

    License

    MIT License - see LICENSE file for details.

    Support

    For issues and questions:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

the_narrator-0.1.1.tar.gz (29.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

the_narrator-0.1.1-py3-none-any.whl (36.4 kB view details)

Uploaded Python 3

File details

Details for the file the_narrator-0.1.1.tar.gz.

File metadata

  • Download URL: the_narrator-0.1.1.tar.gz
  • Upload date:
  • Size: 29.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.1

File hashes

Hashes for the_narrator-0.1.1.tar.gz
Algorithm Hash digest
SHA256 f3c8320d4d535507ae8e1f5fa4fff064276f10d5662ca1df0d0a234726f32880
MD5 7fb87b7ef68c5b45fcf9d314c0c9a046
BLAKE2b-256 62da21d83c2f17a123dc4617bf23026ed313be097e2efb3c61793f88f6d370c9

See more details on using hashes here.

File details

Details for the file the_narrator-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: the_narrator-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 36.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.28.1

File hashes

Hashes for the_narrator-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e34cb1efd2dc35dddd3be7d63ef1158687ab142179698539e69e08a8df27f9b4
MD5 cd20ec65a692dcb435ebe3b003091294
BLAKE2b-256 980c7f51e27ee8c5b96e77dd6df55ac5616a3ffb6f1857c9fc58de2281fa8d4d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page