Skip to main content

Enterprise-Grade Synthetic Data Generation with Flexible Rate Limiting (Concurrency & RPM)

Project description

OmniGen ๐Ÿš€

Generate synthetic data at scale using an enterprise-ready framework with full customizable configuration, security, and ease of use

Built by Ultrasafe AI for production environments.


What is OmniGen?

OmniGen is an enterprise-grade framework for generating synthetic datasets at scaleโ€”from scratch or from base data. Generate trillions of tokens and billions of samples across multiple modalities:

๐ŸŽฏ Data Types Supported

  • ๐Ÿ’ฌ Conversational Data - Single-turn to multi-turn dialogues
  • ๐Ÿค– Agentic Datasets - Tool use, function calling, multi-step reasoning
  • ๐ŸŽจ Multimodal Datasets - Text, images, audio, video combinations
  • ๐Ÿ–ผ๏ธ Images - Synthetic image generation and editing
  • ๐ŸŽต Audio - Speech, music, sound effects
  • ๐ŸŽฌ Video - Synthetic video sequences

๐ŸŽ“ Use Cases

  • Fine-Tuning - Instruction following, task-specific models
  • Supervised Fine-Tuning (SFT) - High-quality labeled datasets
  • Offline Reinforcement Learning - Preference datasets with rewards
  • Online Reinforcement Learning - Ground truth with reward checking scripts
  • Pre-Training - Large-scale corpus generation
  • Machine Learning - Training data for any ML task

๐Ÿ—๏ธ Why OmniGen?

  • โœ… Enterprise-Ready - Built for production at scale
  • โœ… Fully Customizable - Configure every aspect of generation
  • โœ… Secure - Complete isolation, no data mixing
  • โœ… Easy - Simple API, clear examples
  • โœ… Modular - Independent pipelines for different data types

๐Ÿš€ Currently Available Pipeline

conversation_extension - Extend Single-Turn to Multi-Turn Conversations

Turn your base questions into rich multi-turn dialogues. This is just the first pipelineโ€”more coming soon!

๐Ÿ†• Latest Features (v0.0.1.post10)

1. Empty Content & Tool Calls Validation โœ…

  • Full OpenAI/Anthropic API compliance for tool_calls
  • Multi-layer validation (Input โ†’ Generation โ†’ Output)
  • Assistant messages can have empty content if tool_calls exist
  • All other messages must have non-empty content
  • No conversation with empty content ever marked as "success"

2. Real-Time Token Tracking ๐Ÿ’ฐ

  • Captures actual token usage from API responses
  • Tracks per generation call (user_followup, assistant_response)
  • Aggregates per conversation and entire dataset
  • Saves detailed token data in output files
  • Optional cost calculation in console display
  • Configurable pricing per million tokens

3. Enhanced Validation Rules

  • User/System/Tool messages: Content always required (cannot be empty)
  • Assistant messages: Either content OR tool_calls required
  • Tool calls: Only for assistant role, must be non-empty list
  • Clear error messages identify exactly which messages have issues

Why OmniGen?

โœ… Simple - One command to generate thousands of conversations โœ… Scalable - Parallel processing for fast generation โœ… Flexible - Mix different AI providers (OpenAI, Anthropic, Ultrasafe AI) โœ… Production Ready - Built for SaaS platforms with multi-tenant support โœ… Fault Tolerant - Checkpoint/resume system prevents data loss from any interruption


Quick Start

1. Install

pip install omnigen-usf

2. Prepare Base Data

Create a file base_data.jsonl with your starting questions:

{"conversations": [{"role": "user", "content": "How do I learn Python?"}]}
{"conversations": [{"role": "user", "content": "What is machine learning?"}]}
{"conversations": [{"role": "user", "content": "Explain neural networks"}]}

3. Generate Conversations

You can configure the pipeline in two ways:

Option A: Using YAML Configuration File

Create a config.yaml file:

# NEW: Minimal config with smart defaults!
providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
    # Defaults: usf-mini, 0.7 temp, 4096 tokens โœ“
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
    # Defaults: usf-mini, 0.7 temp, 4096 tokens โœ“

generation:
  num_conversations: 100
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: base_data.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

Then load and run:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

# Load configuration from YAML file
config = ConversationExtensionConfig.from_yaml('config.yaml')

# Run the pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

Option B: Using Programmatic Configuration (Python)

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

# Configure the pipeline programmatically
config = (ConversationExtensionConfigBuilder()
    # User followup generator - minimal config!
    .add_provider(
        role='user_followup',
        name='ultrasafe',
        api_key='your-api-key'
        # Defaults: usf-mini, 0.7 temp, 4096 tokens โœ“
    )
    # Assistant response generator
    .add_provider(
        role='assistant_response',
        name='ultrasafe',
        api_key='your-api-key'
        # Defaults: usf-mini, 0.7 temp, 4096 tokens โœ“
    )
    # Generation settings
    .set_generation(
        num_conversations=100,
        turn_range=(3, 8)  # 3-8 turns per conversation
    )
    # Input data
    .set_data_source(
        source_type='file',
        file_path='base_data.jsonl'
    )
    # Output
    .set_storage(
        type='jsonl',
        output_file='output.jsonl'
    )
    .build()
)

# Run the pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

4. Get Results

Your generated conversations will be in output.jsonl:

{
  "id": 0,
  "conversations": [
    {"role": "user", "content": "How do I learn Python?"},
    {"role": "assistant", "content": "Great choice! Start with the basics..."},
    {"role": "user", "content": "What resources do you recommend?"},
    {"role": "assistant", "content": "I recommend these resources..."},
    {"role": "user", "content": "How long will it take?"},
    {"role": "assistant", "content": "With consistent practice..."}
  ],
  "num_turns": 3,
  "success": true
}

Supported AI Providers

NEW: Smart defaults automatically applied! Only specify name and api_key - model, temperature, and max_tokens use optimized defaults.

Provider Default Model Default Temp Default Tokens
Ultrasafe AI usf-mini 0.7 4096
OpenAI gpt-4-turbo 0.7 4096
Anthropic claude-3-5-sonnet-20241022 0.7 4096
OpenRouter openai/gpt-4-turbo 0.7 4096

Override defaults as needed:

providers:
  user_followup:
    name: openai
    api_key: ${OPENAI_API_KEY}
    temperature: 0.9  # Override only what you need

Mix Different Providers

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'openai', api_key, 'gpt-4-turbo')
    .add_provider('assistant_response', 'anthropic', api_key, 'claude-3-5-sonnet')
    # ... rest of config
    .build()
)

Advanced Features

๐Ÿ”„ Checkpoint & Resume System

NEW! Automatic checkpoint/resume functionality prevents data loss and duplication from any interruption.

Overview

The checkpoint system automatically saves progress and enables seamless resume from where you left off - handling manual stops (Ctrl+C), errors, rate limits, server failures, and crashes without losing data or creating duplicates.

How It Works

Conversation Tracking:

  • Position-based ID: Index in base data file (for ordering)
  • Content Hash: SHA256 hash of conversation (for deduplication)
  • Combined Key: {position}_{hash[:8]} for unique identification

Checkpoint Saves:

  • Automatically after every N conversations (configurable)
  • On graceful shutdown (Ctrl+C, SIGTERM)
  • Uses atomic file operations (write temp โ†’ sync โ†’ atomic rename)
  • Validates input file integrity with SHA256 hash

Configuration

checkpoint:
  enabled: true                    # Enable checkpoint/resume
  checkpoint_file: "checkpoint.json"  # Path to checkpoint file
  auto_save_frequency: 10          # Save every 10 conversations
  validate_input_hash: true        # Verify input hasn't changed
  resume_mode: "auto"              # auto | manual | fresh

Configuration Options:

Option Description Default
enabled Enable checkpoint/resume true
checkpoint_file Path to checkpoint file workspaces/{workspace_id}/checkpoint.json
auto_save_frequency Save checkpoint every N conversations 10
validate_input_hash Verify input file unchanged on resume true
resume_mode Resume behavior: auto, manual, or fresh auto

Usage Examples

YAML Configuration:

workspace_id: "production_run"

checkpoint:
  enabled: true
  checkpoint_file: "checkpoint.json"
  auto_save_frequency: 10
  validate_input_hash: true
  resume_mode: "auto"

providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}

generation:
  num_conversations: 1000
  turn_range: {min: 3, max: 8}
  parallel_workers: 10

base_data:
  source_type: file
  file_path: input.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

Programmatic Configuration:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')
    .set_generation(num_conversations=1000, turn_range=(3, 8))
    .set_data_source('file', file_path='input.jsonl')
    .set_storage('jsonl', output_file='output.jsonl')
    .set_checkpoint(
        enabled=True,
        checkpoint_file='checkpoint.json',
        auto_save_frequency=10,
        validate_input_hash=True,
        resume_mode='auto'
    )
    .build()
)

pipeline = ConversationExtensionPipeline(config)
pipeline.run()  # Can interrupt and resume anytime!

Resume Behavior

First Run (No Checkpoint):

==========================================
CONVERSATION EXTENSION PIPELINE
==========================================
Base conversations: 1000
Generating: 1000
Parallel workers: 10
==========================================
Generating: 15%|โ–ˆโ–ˆโ–ˆโ–‘โ–‘โ–‘โ–‘โ–‘โ–‘โ–‘โ–‘โ–‘| 150/1000 [โœ“120 โš 20 โœ—10]
^C
โš ๏ธ  Shutdown signal received. Saving checkpoint...
โœ“ Final checkpoint saved
โš ๏ธ  Interrupted. Progress saved in checkpoint.

Resume Run (Checkpoint Exists):

==========================================
RESUMING FROM CHECKPOINT
==========================================
Previous Run: 2025-10-04 16:30:00 UTC
Already Processed: 150 (โœ“120 โš 20 โœ—10 ~0)
Resuming Partials: 20
Remaining: 850
Parallel workers: 10
==========================================
Generating: 100%|โ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆโ–ˆ| 1000/1000 [โœ“850 โš 100 โœ—50]

==========================================
GENERATION COMPLETE
==========================================
โœ“ Complete:   850
โš  Partial:    100
โœ— Failed:      50
โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”
๐Ÿ’พ Saved:     950  (95.0%)
==========================================

Output Format

Both partial and complete conversations saved to same output file with status indicators:

{"id": 0, "status": "completed", "is_complete": true, "conversations": [...], "num_turns": 5}
{"id": 1, "status": "partial", "is_complete": false, "conversations": [...], "num_turns": 3}
{"id": 2, "status": "completed", "is_complete": true, "conversations": [...], "num_turns": 7}

Status Values:

  • "completed" - Successfully generated all requested turns
  • "partial" - Interrupted during generation, may be incomplete
  • "failed" - Error occurred, saved to failed file

Filtering Results:

# Get only completed conversations
jq 'select(.status == "completed")' output.jsonl > completed.jsonl

# Get partial conversations (to review/complete)
jq 'select(.status == "partial")' output.jsonl > partial.jsonl

# Count by status
jq -s 'group_by(.status) | map({status: .[0].status, count: length})' output.jsonl

Key Features

โœ… Zero Data Loss

  • Progress saved automatically at configurable intervals
  • Every conversation saved before moving to next
  • Atomic file operations prevent corruption

โœ… No Duplicates

  • Hybrid identification (position + content hash)
  • Skips already processed conversations automatically
  • Prevents reprocessing on resume

โœ… Partial Resume

  • Continues incomplete multi-turn conversations
  • Resumes from exact turn where interrupted
  • Preserves all generated content

โœ… Universal Coverage

  • Handles ALL interruption types:
    • Manual stops (Ctrl+C)
    • Server/connection failures
    • Rate limit errors
    • System crashes
    • Any other errors

โœ… Input Validation

  • SHA256 hash verification of input file
  • Detects if input changed between runs
  • Prevents processing wrong data

โœ… Safety Features

  • Atomic file operations (corruption-proof)
  • Automatic backup of corrupted checkpoints
  • Graceful shutdown handlers (SIGINT/SIGTERM)

Checkpoint Structure

{
  "version": "1.0",
  "run_id": "unique-uuid",
  "started_at": "2025-10-04T20:00:00.000Z",
  "last_checkpoint_at": "2025-10-04T20:15:30.000Z",
  "base_data": {
    "source_type": "file",
    "file_path": "input.jsonl",
    "file_hash": "sha256-hash",
    "total_available": 1000
  },
  "progress": {
    "total_processed": 150,
    "completed": 120,
    "partial": 20,
    "failed": 10,
    "last_position": 149
  },
  "processed_records": [
    {
      "position": 0,
      "content_hash": "a1b2c3d4",
      "status": "completed",
      "turns_generated": 5,
      "processed_at": "2025-10-04T20:01:00.000Z"
    }
  ],
  "partial_states": {
    "5_b2c3d4e5": {
      "position": 5,
      "conversation": [...],
      "turns_completed": 3,
      "target_turns": 5
    }
  }
}

Best Practices

  1. Set Appropriate Save Frequency

    checkpoint:
      auto_save_frequency: 10  # Balance performance vs safety
    
    • Lower (5-10): Better safety, slight overhead
    • Higher (50-100): Better performance, more risk
  2. Use Workspace Isolation

    config = ConversationExtensionConfigBuilder(
        workspace_id="unique_project_id"  # Prevents conflicts
    )
    
  3. Monitor Checkpoint Size

    • Large checkpoints (>100MB) may slow saves
    • Consider processing in batches for huge datasets
  4. Backup Important Checkpoints

    cp checkpoint.json checkpoint.backup.json
    
  5. Clean Up After Completion

    # After successful run
    rm checkpoint.json
    

Troubleshooting

Problem: Checkpoint not resuming

  • Check checkpoint.enabled is true
  • Verify checkpoint file exists at specified path
  • Ensure resume_mode is not set to "fresh"
  • Review logs for validation errors

Problem: Input file changed warning

  • If intentional: Set validate_input_hash: false
  • If unintentional: Restore original input file
  • Or delete checkpoint to start fresh

Problem: Partial conversations not resuming

  • Verify partial_states exists in checkpoint
  • Check conversation has _position and _content_hash
  • Review logs for partial state loading errors

Problem: Duplicate conversations

  • Checkpoint prevents this automatically
  • Check both position and content_hash are tracked
  • Verify hybrid identification is working

Performance Impact

  • Checkpoint overhead: ~10-50ms per save (amortized)
  • Memory usage: Minimal (lightweight data structure)
  • Disk space: ~1-2KB per conversation in checkpoint
  • Resume speed: Instant (IDs loaded into memory set)

Example: Complete Workflow

import os
from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

# Build configuration
config = (ConversationExtensionConfigBuilder(workspace_id="production_v1")
    .add_provider(
        role="user_followup",
        name="openai",
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-4o-mini"
    )
    .add_provider(
        role="assistant_response",
        name="openai",
        api_key=os.getenv("OPENAI_API_KEY"),
        model="gpt-4o-mini"
    )
    .set_generation(
        num_conversations=10000,
        turn_range=(3, 8),
        parallel_workers=20
    )
    .set_data_source(
        source_type="file",
        file_path="data/base_conversations.jsonl"
    )
    .set_storage(
        type="jsonl",
        output_file="output.jsonl"
    )
    .set_checkpoint(
        enabled=True,
        checkpoint_file="checkpoint.json",
        auto_save_frequency=50,
        validate_input_hash=True,
        resume_mode="auto"
    )
    .build()
)

# Run with automatic resume
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

# Can be interrupted and resumed multiple times
# Progress is never lost!

๐Ÿ“ See also: examples/checkpoint_resume_example.py for a complete working example with checkpoint inspection utilities.


Multi-Tenant SaaS Support

Perfect for platforms serving multiple users concurrently:

# Each user gets isolated workspace
workspace_id = f"user_{user_id}_session_{session_id}"

config = (ConversationExtensionConfigBuilder(workspace_id=workspace_id)
    .add_provider('user_followup', 'ultrasafe', shared_api_key, 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', shared_api_key, 'usf-mini')
    .set_storage('jsonl', output_file='output.jsonl')  # Auto-isolated
    .build()
)

# Storage automatically goes to: workspaces/{workspace_id}/output.jsonl

Parallel Dataset Generation

from concurrent.futures import ProcessPoolExecutor

def process_dataset(input_file, output_file):
    config = (ConversationExtensionConfigBuilder()
        .add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')
        .add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')
        .set_data_source('file', file_path=input_file)
        .set_storage('jsonl', output_file=output_file)
        .build()
    )
    ConversationExtensionPipeline(config).run()

# Process 3 datasets in parallel
with ProcessPoolExecutor(max_workers=3) as executor:
    executor.submit(process_dataset, 'data1.jsonl', 'out1.jsonl')
    executor.submit(process_dataset, 'data2.jsonl', 'out2.jsonl')
    executor.submit(process_dataset, 'data3.jsonl', 'out3.jsonl')

๐Ÿ“ Configuration Methods

OmniGen supports multiple ways to configure your pipeline. Choose the method that best fits your workflow:

Method 1: YAML Configuration File โญ (Recommended)

The most flexible and maintainable approach:

Step 1: Create config.yaml

# NEW: Minimal configuration with smart defaults
providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}  # Only name and API key required!
    # Defaults: usf-mini, 0.7 temp, 4096 tokens โœ“
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
    # Defaults: usf-mini, 0.7 temp, 4096 tokens โœ“

generation:
  num_conversations: 100
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: base_data.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

Step 2: Load and run

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

# Load from YAML file
config = ConversationExtensionConfig.from_yaml('config.yaml')

# Run pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

โœ… Benefits:

  • Easy to version control and share
  • Environment variable support with ${VAR_NAME}
  • No code changes needed for config updates
  • Clear, readable configuration

Method 2: Programmatic Configuration (Python)

Build configuration directly in code:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')
    .set_generation(num_conversations=100, turn_range=(3, 8))
    .set_data_source('file', file_path='base_data.jsonl')
    .set_storage('jsonl', output_file='output.jsonl')
    .build()
)

pipeline = ConversationExtensionPipeline(config)
pipeline.run()

โœ… Benefits:

  • Type safety and IDE autocomplete
  • Dynamic configuration based on runtime
  • No external files needed
  • Easy application integration

Method 3: Dictionary Configuration

Create from Python dictionary:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

config_dict = {
    'providers': {
        'user_followup': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'},
        'assistant_response': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'}
    },
    'generation': {'num_conversations': 100, 'turn_range': {'min': 3, 'max': 8}},
    'base_data': {'source_type': 'file', 'file_path': 'base_data.jsonl'},
    'storage': {'type': 'jsonl', 'output_file': 'output.jsonl'}
}

config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

โœ… Benefits:

  • Load from JSON files or APIs
  • Easy programmatic modification
  • Flexible for dynamic scenarios

Method 4: Hybrid Approach

Combine YAML with programmatic overrides:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

# Load base config from YAML
config = ConversationExtensionConfig.from_yaml('base_config.yaml')

# Modify specific settings
config_dict = config.to_dict()
config_dict['generation']['num_conversations'] = 500  # Override
config_dict['storage']['output_file'] = f'output_{user_id}.jsonl'  # Dynamic

# Rebuild and run
config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

โœ… Benefits:

  • Base configuration in YAML
  • Runtime customization in code
  • Best of both worlds

Environment Variables in YAML

Use ${VARIABLE_NAME} syntax:

providers:
  user_followup:
    name: ${PROVIDER_NAME}           # From environment
    api_key: ${ULTRASAFE_API_KEY}    # From environment
    model: ${USER_MODEL}             # From environment

base_data:
  file_path: ${INPUT_PATH}           # From environment

storage:
  output_file: ${OUTPUT_PATH}        # From environment

Set variables:

export PROVIDER_NAME="ultrasafe"
export ULTRASAFE_API_KEY="your-key"
export USER_MODEL="usf-mini"
export INPUT_PATH="data/input.jsonl"
export OUTPUT_PATH="data/output.jsonl"

# Then run
python your_script.py

CLI Usage with Config File

Run directly from command line:

# Using YAML config
omnigen conversation-extension --config config.yaml

# With overrides
omnigen conversation-extension \
  --config config.yaml \
  --num-conversations 500 \
  --output custom_output.jsonl


๐Ÿ“– Complete Configuration Reference

All Configuration Options Explained

Below is a comprehensive YAML configuration showing ALL available options with detailed explanations:

# ==============================================================================
# WORKSPACE ISOLATION (Optional)
# ==============================================================================
# Unique ID for multi-tenant environments - auto-isolates all output files
workspace_id: "user_123_session_abc"

# ==============================================================================
# PROVIDERS - AI Model Configuration
# ==============================================================================
# Configure different AI providers for each role
# Each role can use a different provider/model combination

providers:
  # Provider for generating user follow-up questions
  user_followup:
    name: ultrasafe              # Required: ultrasafe, openai, anthropic, openrouter
    api_key: ${API_KEY}          # Required: Use env var ${VAR_NAME} or direct key
    
    # Optional - Smart defaults applied if not specified:
    model: usf-mini              # Default varies by provider
    temperature: 0.7             # Default: 0.7
    max_tokens: 2048             # Default: 4096 (overridden here)
    timeout: 300                 # Default: 300
    max_retries: 5               # Default: 5
    retry_delay: 2               # Default: 2
  
  # Provider for generating assistant responses
  assistant_response:
    name: ultrasafe              # Can use different provider than user_followup
    api_key: ${API_KEY}          # Only name and api_key are required!
    max_tokens: 8192             # Override default (4096) for detailed responses
    # model, temperature, timeout, retries use defaults โœ“

# PROVIDER OPTIONS:
# ----------------
# ultrasafe:
#   models: usf-mini, usf-max
#
# openai:
#   models: gpt-4-turbo, gpt-4, gpt-3.5-turbo, gpt-4o, gpt-4o-mini
#
# anthropic:
#   models: claude-3-5-sonnet-20241022, claude-3-opus-20240229,
#           claude-3-sonnet-20240229, claude-3-haiku-20240307
#
# openrouter:
#   models: Any OpenRouter supported model
# ==============================================================================
# TOKEN TRACKING (NEW!)
# ==============================================================================
generation:
  track_tokens: true                   # Enable real-time token tracking from API
  token_pricing:                       # Optional: for cost display in console
    input_cost_per_million: 0.35       # Your input token pricing
    output_cost_per_million: 2.50      # Your output token pricing

# Token tracking captures ACTUAL usage from API responses:
# - Per generation call (user_followup, assistant_response)
# - Per conversation (aggregated)
# - Total dataset (summed across all conversations)
#
# Output includes detailed token breakdown:
# {
#   "tokens": {
#     "input_tokens": 24129,
#     "output_tokens": 5924,
#     "total_tokens": 30053,
#     "per_generation": [
#       {"type": "user_followup", "input_tokens": 3150, "output_tokens": 41},
#       {"type": "assistant_response", "input_tokens": 2632, "output_tokens": 927}
#     ]
#   }
# }
#
# NOTE: Token data is saved to output files
# Cost calculation is optional and only shown in console

#   base_url: https://openrouter.ai/api/v1 (optional)

# ==============================================================================
# GENERATION SETTINGS
# ==============================================================================
generation:
  num_conversations: 100           # Total conversations to generate
                                   # Use 0 or omit to process ALL available conversations
  
  turn_range:                      # Number of turns per conversation
    min: 3                         # Minimum turns
    max: 8                         # Maximum turns
  
  parallel_workers: 10             # Concurrent workers (balance speed vs rate limits)
  
  # Extension behavior for multi-turn input
  extension_mode: "smart"          # Options: "smart" | "legacy"
  # - smart: Intelligently handle multi-turn conversations
  # - legacy: Always extract first user message only
  
  skip_invalid: true               # Skip invalid patterns (recommended: true)
  
  # Turn calculation method
  turn_calculation: "additional"   # Options: "additional" | "total"
  # - additional: Add NEW turns on top of existing (default)
  # - total: Keep total turns within range (never removes existing)

# ==============================================================================
# DATA SOURCE CONFIGURATION
# ==============================================================================
base_data:
  enabled: true                    # Enable base data loading
  
  # OPTION 1: Local File
  source_type: file                # Use local JSONL/JSON file
  file_path: data/input.jsonl      # Path to file
  format: conversations            # JSON key containing conversation array
  shuffle: false                   # Shuffle data before processing
  
  # OPTION 2: HuggingFace Dataset
  # source_type: huggingface       # Use HuggingFace dataset
  # hf_dataset: username/dataset   # HuggingFace dataset path
  # hf_split: train                # Dataset split: train, test, validation
  # hf_token: ${HF_TOKEN}          # HuggingFace API token (if private)
  # hf_streaming: false            # Stream dataset (for large datasets)
  # format: conversations          # Field name in dataset
  # shuffle: true                  # Shuffle after loading

# ==============================================================================
# STORAGE CONFIGURATION
# ==============================================================================
storage:
  type: jsonl                      # Options: jsonl | mongodb
  
  # JSONL Storage (Default)
  output_file: output.jsonl        # Successful conversations
  partial_file: partial.jsonl      # Partial/incomplete conversations (legacy)
  failed_file: failed.jsonl        # Failed conversations
  
  # MongoDB Storage (Alternative)
  # type: mongodb
  # mongodb:
  #   connection_string: mongodb://localhost:27017
  #   database: omnigen
  #   collection: conversations
  #   output_collection: output          # Successful
  #   partial_collection: partial        # Partial
  #   failed_collection: failed          # Failed

# ==============================================================================
# CHECKPOINT/RESUME CONFIGURATION (NEW!)
# ==============================================================================
checkpoint:
  enabled: true                    # Enable automatic checkpoint/resume
  checkpoint_file: "workspaces/{workspace_id}/checkpoint.json"
  auto_save_frequency: 10          # Save checkpoint every N conversations
  validate_input_hash: true        # Verify input file hasn't changed on resume
  resume_mode: "auto"              # Options: "auto" | "manual" | "fresh"
  
  # Features:
  # - Zero data loss from interruptions
  # - No duplicate processing
  # - Partial conversation resume
  # - Handles: Ctrl+C, errors, rate limits, crashes

# ==============================================================================
# DATETIME CONFIGURATION (Optional)
# ==============================================================================
datetime_config:
  enabled: true                    # Enable datetime generation
  mode: random_from_range          # Options: random_from_range | current | fixed
  timezone: UTC                    # Timezone (UTC, America/New_York, Asia/Dubai, etc.)
  format: "%Y-%m-%d %H:%M:%S"      # Python strftime format
  
  # For random_from_range mode
  range:
    start: "2024-01-01 00:00:00"   # Start datetime
    end: "2024-12-31 23:59:59"     # End datetime
  
  # For fixed mode
  # fixed_datetime: "2024-06-15 12:00:00"

# ==============================================================================
# SYSTEM MESSAGES (Optional)
# ==============================================================================
system_messages:
  # Prepend system message to every conversation
  prepend_always:
    enabled: true
    content: "You are a helpful AI assistant. Current time: {current_datetime} ({timezone})."
  
  # Append system message to every conversation
  append_always:
    enabled: false
    content: "Remember to be concise and helpful."
  
  # Add system message only if none exists
  add_if_missing:
    enabled: false
    content: "You are an AI assistant."

# Available variables in system messages:
# - {current_datetime}: Generated datetime
# - {timezone}: Configured timezone
# - {workspace_id}: Current workspace ID

# ==============================================================================
# GENERATION-ONLY SYSTEM MESSAGES (Optional - NEW FEATURE!)
# ==============================================================================
# These system messages are used during generation but NOT saved to the dataset.
# Perfect for providing internal guidance without polluting training data.

generation_system_messages:
  assistant_response:
    enabled: true
    content: |
      INTERNAL GENERATION GUIDELINES (not saved to dataset):
      
      Quality Standards:
      - Accuracy: Provide factually correct information
      - Clarity: Use simple, clear language
      - Completeness: Address all aspects of the question
      - Examples: Include concrete examples when helpful
      
      Formatting:
      - Use markdown for better readability
      - Break long responses into sections
      - Use bullet points for lists
      
      Tone:
      - Professional yet friendly
      - Encouraging and supportive
      - Clear and direct

# Available variables (same as system_messages):
# - {current_datetime}: Generated datetime
# - {timezone}: Configured timezone

# ==============================================================================
# CUSTOM PROMPTS (Optional - Smart Defaults Provided!)
# ==============================================================================
# NOTE: Prompts are completely OPTIONAL. The system automatically uses optimized
# default prompts if you don't specify any. Only add this section if you want
# to customize the prompt behavior.

prompts:
  # Custom prompt for user follow-up generation
  followup_question: |
    ## Your Task
    Generate an intelligent follow-up user question based on conversation history.
    
    ### CONVERSATION HISTORY:
    {history}
    
    ### INSTRUCTIONS:
    - Generate a meaningful follow-up question
    - Be conversational and natural
    - Vary your phrasing and tone
    - Build on the assistant's last response
    
    Return your follow-up question wrapped in XML tags:
    <user>Your follow-up question here</user>
  
  # Custom prompt for assistant response generation
  # assistant_response: |
  #   Your custom assistant response prompt here...

# ==============================================================================
# DEBUG OPTIONS (Optional)
# ==============================================================================
debug:
  log_api_timing: true             # Log API call timings
  log_parallel_status: true        # Log parallel worker status
  verbose: false                   # Verbose logging

Quick Configuration Examples

Example 1: Minimal Configuration (NEW!)

# Only specify what's required - defaults applied automatically
providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}

generation:
  num_conversations: 100
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: input.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

# Enable checkpoint/resume (NEW!)
checkpoint:
  enabled: true
  auto_save_frequency: 10

# Optional: Generation-only guidance (not saved to dataset)
generation_system_messages:
  assistant_response:
    enabled: false  # Enable if needed
    content: "Provide accurate, well-researched responses."

Example 2: HuggingFace Dataset Input

providers:
  user_followup:
    name: openai
    api_key: ${OPENAI_API_KEY}
    model: gpt-4-turbo
  assistant_response:
    name: anthropic
    api_key: ${ANTHROPIC_API_KEY}
    model: claude-3-5-sonnet-20241022

generation:
  num_conversations: 1000
  turn_range: {min: 5, max: 10}
  parallel_workers: 20

base_data:
  source_type: huggingface
  hf_dataset: username/my-dataset
  hf_split: train
  hf_token: ${HF_TOKEN}
  format: conversations
  shuffle: true

storage:
  type: jsonl
  output_file: output.jsonl

# Checkpoint/resume for fault tolerance
checkpoint:
  enabled: true
  checkpoint_file: "checkpoint.json"
  auto_save_frequency: 50
  validate_input_hash: true

Example 3: Mixed Providers with MongoDB

providers:
  user_followup:
    name: openai
    api_key: ${OPENAI_API_KEY}
    model: gpt-3.5-turbo
    temperature: 0.8
  assistant_response:
    name: anthropic
    api_key: ${ANTHROPIC_API_KEY}
    model: claude-3-5-sonnet-20241022
    temperature: 0.7

generation:
  num_conversations: 500
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: questions.jsonl

storage:
  type: mongodb
  mongodb:
    connection_string: mongodb://localhost:27017
    database: omnigen
    collection: conversations

# Checkpoint works with any storage type
checkpoint:
  enabled: true
  auto_save_frequency: 25

Example 4: Programmatic Configuration (Python)

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

# Build configuration programmatically
config = (ConversationExtensionConfigBuilder()
    # Workspace isolation
    .set_workspace_id("user_123_session_abc")
    
    # Providers
    .add_provider(
        role='user_followup',
        name='ultrasafe',
        api_key='your-api-key',
        model='usf-mini',
        temperature=0.7,
        max_tokens=2048
    )
    .add_provider(
        role='assistant_response',
        name='ultrasafe',
        api_key='your-api-key',
        model='usf-mini',
        temperature=0.7,
        max_tokens=8192
    )
    
    # Generation settings
    .set_generation(
        num_conversations=100,  # Or use 0/None to process ALL available
        turn_range=(3, 8),
        parallel_workers=10,
        extension_mode='smart',
        skip_invalid=True,
        turn_calculation='additional'
    )
    
    # Data source - Local file
    .set_data_source(
        source_type='file',
        file_path='input.jsonl',
        format='conversations',
        shuffle=False
    )
    
    # Data source - HuggingFace (alternative)
    # .set_data_source(
    #     source_type='huggingface',
    #     hf_dataset='username/dataset',
    #     hf_split='train',
    #     hf_token='your-token',
    #     format='conversations',
    #     shuffle=True
    # )
    
    # Storage
    .set_storage(
        type='jsonl',
        output_file='output.jsonl',
        partial_file='partial.jsonl',
        failed_file='failed.jsonl'
    )
    
    # Custom prompts (optional)
    .set_prompts(
        followup_question="Your custom prompt here with {history}"
    )
    
    .build()
)

# Optional: Add generation-only system messages (not saved to dataset)
config_dict = config.to_dict()
config_dict['generation_system_messages'] = {
    'assistant_response': {
        'enabled': True,
        'content': 'Provide accurate, well-researched responses with examples.'
    }
}
config = ConversationExtensionConfig.from_dict(config_dict)

# Run pipeline with checkpoint/resume support
pipeline = ConversationExtensionPipeline(config)
pipeline.run()  # Can be interrupted and resumed automatically!

๐Ÿ”ง Advanced Configuration Options - Deep Dive

System Messages Configuration

System messages allow you to inject context or instructions into conversations. You have three modes of operation:

1. Prepend Always

Add a system message at the start of every conversation:

system_messages:
  prepend_always:
    enabled: true
    content: "You are a helpful AI assistant. Current time: {current_datetime} ({timezone})."

Use Cases:

  • Set assistant personality/role
  • Provide real-time context (date, time)
  • Add global instructions

2. Append Always

Add a system message at the end of every conversation:

system_messages:
  append_always:
    enabled: true
    content: "Remember to be concise and provide sources when possible."

Use Cases:

  • Add final reminders
  • Set response style constraints
  • Add quality guidelines

3. Add If Missing

Add a system message only if the conversation doesn't already have one:

system_messages:
  add_if_missing:
    enabled: true
    content: "You are an AI assistant."

Use Cases:

  • Ensure all conversations have baseline instructions
  • Fallback when base data lacks system messages

Available Template Variables

You can use these variables in your system message content:

Variable Description Example Output
{current_datetime} Generated datetime 2024-06-15 14:30:00
{timezone} Configured timezone UTC or America/New_York
{workspace_id} Current workspace ID user_123_session_abc

Complete Example - All Three Modes

system_messages:
  prepend_always:
    enabled: true
    content: |
      You are a knowledgeable AI assistant.
      Current datetime: {current_datetime} ({timezone})
      Workspace: {workspace_id}
  
  append_always:
    enabled: true
    content: |
      IMPORTANT REMINDERS:
      - Always cite sources when providing facts
      - Be concise but thorough
      - Ask clarifying questions when needed
  
  add_if_missing:
    enabled: true
    content: "You are a helpful assistant."

Note: If both prepend_always and add_if_missing are enabled, prepend_always takes precedence.

Programmatic Usage

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

config = (ConversationExtensionConfigBuilder()
    # ... other config ...
    .build()
)

# Add system messages to config dict
config_dict = config.to_dict()
config_dict['system_messages'] = {
    'prepend_always': {
        'enabled': True,
        'content': 'You are a helpful assistant. Time: {current_datetime}'
    }
}

Generation-Only System Messages

NEW FEATURE: Guide LLM generation without polluting your dataset!

Unlike system_messages which are saved to the dataset, generation_system_messages are used only during generation and are NOT saved to the final conversation data.

Use Cases

  • Provide generation guidance without affecting dataset
  • Set quality standards for LLM output
  • Add constraints that shouldn't be in training data
  • Guide tone/style during generation only

Configuration

# These ARE saved to dataset (existing feature)
system_messages:
  prepend_always:
    enabled: true
    content: "You are a helpful assistant."

# These are NOT saved to dataset (NEW feature)
generation_system_messages:
  assistant_response:
    enabled: true
    content: |
      GENERATION GUIDANCE (not saved to dataset):
      - Provide accurate, well-researched responses
      - Use clear, concise language
      - Include examples when helpful
      - Maintain professional tone

How It Works

  1. During Generation: System message is prepended to conversation for API call
  2. In Dataset: System message is excluded from saved conversation

Example:

generation_system_messages:
  assistant_response:
    enabled: true
    content: |
      You are generating high-quality assistant responses.
      
      Guidelines:
      - Be accurate and informative
      - Use markdown formatting
      - Cite sources when making claims
      - Be concise but thorough

What LLM sees during generation:

System: You are generating high-quality assistant responses...
User: How does photosynthesis work?

What gets saved to dataset:

{
  "conversations": [
    {"role": "user", "content": "How does photosynthesis work?"},
    {"role": "assistant", "content": "Photosynthesis is..."}
  ]
}

Template Variables

Same variables as system_messages:

Variable Description Example
{current_datetime} Generated datetime 2024-12-15 14:30:00
{timezone} Configured timezone UTC
generation_system_messages:
  assistant_response:
    enabled: true
    content: |
      Generate responses as if current time is {current_datetime} ({timezone}).
      Ensure all temporal references are consistent with this datetime.

Complete Example

# Dataset system messages (saved)
system_messages:
  prepend_always:
    enabled: true
    content: "You are a helpful AI assistant."

# Generation guidance (NOT saved)
generation_system_messages:
  assistant_response:
    enabled: true
    content: |
      INTERNAL GENERATION GUIDELINES:
      
      Quality Standards:
      - Accuracy: Verify facts before stating them
      - Clarity: Use simple language, avoid jargon
      - Completeness: Address all aspects of the question
      - Examples: Provide concrete examples when helpful
      
      Formatting:
      - Use markdown for better readability
      - Break long responses into sections
      - Use bullet points for lists
      
      Tone:
      - Professional yet friendly
      - Encouraging and supportive
      - Clear and direct

Programmatic Usage

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'openai', api_key, 'gpt-4o-mini')
    .add_provider('assistant_response', 'openai', api_key, 'gpt-4o')
    # ... other config ...
    .build()
)

# Add generation-only system messages
config_dict = config.to_dict()
config_dict['generation_system_messages'] = {
    'assistant_response': {
        'enabled': True,
        'content': """
            Generate high-quality, well-researched responses.
            Use markdown formatting and provide examples.
        """
    }
}

config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

Comparison: system_messages vs generation_system_messages

Feature system_messages generation_system_messages
Purpose Instructions for the AI in production Guidance during dataset generation
Saved to dataset โœ… Yes โŒ No
Use case Define AI personality, role, constraints Quality standards, formatting rules
Example "You are a medical assistant" "Use medical terminology, cite sources"
When to use Want users to see these instructions Internal generation guidance only

DateTime Configuration

Control how datetime values are generated and injected into system messages and conversations.

Mode 1: Random from Range (Default)

Generate random datetimes within a specified range:

datetime_config:
  enabled: true
  mode: random_from_range
  timezone: UTC                        # Any valid timezone
  format: "%Y-%m-%d %H:%M:%S"          # Python strftime format
  range:
    start: "2024-01-01 00:00:00"
    end: "2024-12-31 23:59:59"

Use Cases:

  • Training data with temporal diversity
  • Simulating historical conversations
  • Creating time-aware datasets

Common Timezones:

  • UTC - Coordinated Universal Time
  • America/New_York - Eastern Time
  • Europe/London - British Time
  • Asia/Dubai - Gulf Standard Time
  • Asia/Tokyo - Japan Standard Time

Mode 2: Current Time

Use actual current time when generating:

datetime_config:
  enabled: true
  mode: current
  timezone: America/New_York
  format: "%B %d, %Y at %I:%M %p"      # December 15, 2024 at 02:30 PM

Use Cases:

  • Real-time conversation simulation
  • Current events discussions
  • Live system demonstrations

Mode 3: Fixed DateTime

Use the same datetime for all conversations:

datetime_config:
  enabled: true
  mode: fixed
  timezone: UTC
  format: "%Y-%m-%d %H:%M:%S"
  fixed_datetime: "2024-06-15 12:00:00"

Use Cases:

  • Consistent training data
  • Specific time period simulation
  • Testing and debugging

Format String Examples

Common datetime formats using Python's strftime:

# ISO 8601 Format
format: "%Y-%m-%d %H:%M:%S"           # 2024-12-15 14:30:00

# Human-Readable
format: "%B %d, %Y at %I:%M %p"       # December 15, 2024 at 02:30 PM

# Date Only
format: "%Y-%m-%d"                     # 2024-12-15

# Time Only
format: "%H:%M:%S"                     # 14:30:00

# Custom Format
format: "%A, %B %d, %Y"               # Monday, December 15, 2024

Complete DateTime Example

datetime_config:
  enabled: true
  mode: random_from_range
  timezone: America/New_York
  format: "%A, %B %d, %Y at %I:%M %p %Z"
  range:
    start: "2024-01-01 09:00:00"      # Business hours only
    end: "2024-12-31 17:00:00"

# Then use in system messages:
system_messages:
  prepend_always:
    enabled: true
    content: |
      You are a business assistant.
      Current datetime: {current_datetime}
      Timezone: {timezone}

Output Example:

You are a business assistant.
Current datetime: Wednesday, June 15, 2024 at 02:30 PM EST
Timezone: America/New_York

Custom Prompts Configuration

NOTE: Custom prompts are completely optional. OmniGen automatically uses optimized default prompts if the prompts section is not included in your configuration. Only add custom prompts if you want to override the default behavior.

Default Prompts

OmniGen uses optimized default prompts that work well for most use cases. You can customize them if needed:

1. Follow-up Question Prompt (for user_followup role):

prompts:
  followup_question: |
    ## Your Task
    Generate an intelligent follow-up user question based on conversation history.
    
    ### CONVERSATION HISTORY:
    {history}
    
    ### INSTRUCTIONS:
    - Generate a meaningful follow-up question
    - Be conversational and natural
    - Vary your phrasing and tone
    - Build on the assistant's last response
    - Make the question specific to the conversation context
    
    Return your follow-up question wrapped in XML tags:
    <user>Your follow-up question here</user>

2. Assistant Response Prompt (for assistant_response role):

prompts:
  assistant_response: |
    ## Your Task
    Generate a helpful assistant response based on the conversation history.
    
    ### CONVERSATION HISTORY:
    {history}
    
    ### INSTRUCTIONS:
    - Provide accurate and helpful information
    - Be conversational and friendly
    - Reference previous context when relevant
    - Keep responses focused and concise
    
    Return your response wrapped in XML tags:
    <assistant>Your response here</assistant>

Available Template Variables

Variable Description Content
{history} Full conversation history All previous messages formatted as text

Custom Prompt Examples

Example 1: Technical Documentation Assistant

prompts:
  followup_question: |
    Generate a technical follow-up question from a developer's perspective.
    
    CONVERSATION:
    {history}
    
    Create a question that:
    - Asks about implementation details
    - Seeks code examples or best practices
    - Explores edge cases or potential issues
    
    <user>Your technical question</user>
  
  assistant_response: |
    Provide a detailed technical response with code examples.
    
    CONVERSATION:
    {history}
    
    Your response should:
    - Include code snippets when helpful
    - Explain technical concepts clearly
    - Provide links to documentation
    - Mention potential pitfalls
    
    <assistant>Your technical response</assistant>

Example 2: Customer Support Simulation

prompts:
  followup_question: |
    Simulate a customer asking for help.
    
    CONVERSATION:
    {history}
    
    Generate a question that:
    - Shows frustration or confusion (realistic)
    - Asks for clarification on previous response
    - Requests specific solutions or workarounds
    
    <user>Customer question</user>
  
  assistant_response: |
    Provide empathetic customer support.
    
    CONVERSATION:
    {history}
    
    Your response must:
    - Show empathy and understanding
    - Provide clear step-by-step solutions
    - Offer alternatives if applicable
    - End with "Is there anything else I can help with?"
    
    <assistant>Support response</assistant>

Example 3: Educational Tutor

prompts:
  followup_question: |
    Generate a student's follow-up question showing learning progression.
    
    CONTEXT:
    {history}
    
    The question should:
    - Build on what was just explained
    - Show either understanding or confusion
    - Ask for examples or clarification
    - Demonstrate curiosity about the topic
    
    <user>Student question</user>
  
  assistant_response: |
    Respond as a patient, knowledgeable tutor.
    
    CONTEXT:
    {history}
    
    Your response should:
    - Use simple, clear language
    - Provide concrete examples
    - Check for understanding
    - Encourage further questions
    
    <assistant>Tutor response</assistant>

Programmatic Prompt Configuration

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

custom_prompts = {
    'followup_question': """
        Generate a follow-up based on: {history}
        Make it specific and contextual.
        <user>question</user>
    """,
    'assistant_response': """
        Respond helpfully to: {history}
        Be clear and concise.
        <assistant>response</assistant>
    """
}

config = (ConversationExtensionConfigBuilder()
    # ... other config ...
    .set_prompts(**custom_prompts)
    .build()
)

Debug Configuration

Enable detailed logging and monitoring for troubleshooting and optimization.

Debug Options

debug:
  log_api_timing: true          # Log API call duration and performance
  log_parallel_status: true     # Log parallel worker status and progress
  verbose: false                # Enable verbose logging (all operations)

Option Details

1. API Timing Logs (log_api_timing: true)

Tracks API performance for each call:

[API TIMING] user_followup request took 1.23s
[API TIMING] assistant_response request took 2.45s
[API TIMING] Average response time: 1.84s

Use Cases:

  • Identify slow API providers
  • Optimize provider selection
  • Monitor rate limits
  • Debug timeout issues

2. Parallel Status Logs (log_parallel_status: true)

Shows worker activity in real-time:

[PARALLEL] Worker 1/10: Processing conversation 5
[PARALLEL] Worker 2/10: Processing conversation 6
[PARALLEL] Worker 3/10: Waiting for task...
[PARALLEL] Progress: 45/100 conversations complete (45%)
[PARALLEL] Active workers: 8/10 | Queue: 12 remaining

Use Cases:

  • Monitor parallel processing
  • Identify bottlenecks
  • Optimize worker count
  • Track progress in real-time

3. Verbose Logging (verbose: true)

Enables comprehensive logging of all operations:

[DEBUG] Loading base data from: data/input.jsonl
[DEBUG] Loaded 100 base conversations
[DEBUG] Initializing provider: ultrasafe (usf-mini)
[DEBUG] Starting parallel generation with 10 workers
[DEBUG] Conversation 1: Processing...
[DEBUG] Conversation 1: Generated 5 turns
[DEBUG] Conversation 1: Saved to output.jsonl
[DEBUG] Final stats: 95 success, 3 partial, 2 failed

Use Cases:

  • Development and testing
  • Debugging issues
  • Understanding pipeline flow
  • Troubleshooting data problems

Complete Debug Configuration

# Full debug mode for development
debug:
  log_api_timing: true
  log_parallel_status: true
  verbose: true

# Production mode (minimal logging)
debug:
  log_api_timing: false
  log_parallel_status: false
  verbose: false

# Performance monitoring mode
debug:
  log_api_timing: true      # Track API performance
  log_parallel_status: true # Monitor workers
  verbose: false            # Don't flood logs

Programmatic Debug Configuration

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

config = (ConversationExtensionConfigBuilder()
    # ... other config ...
    .build()
)

# Add debug settings
config_dict = config.to_dict()
config_dict['debug'] = {
    'log_api_timing': True,
    'log_parallel_status': True,
    'verbose': False
}

Debug Output Examples

Scenario 1: API Performance Issue

debug:
  log_api_timing: true

Output helps identify slow providers:

[API TIMING] openai/gpt-4-turbo: 0.8s
[API TIMING] anthropic/claude-3-5-sonnet: 3.2s  โ† SLOW!
[API TIMING] ultrasafe/usf-mini: 0.5s

Scenario 2: Parallel Processing Bottleneck

debug:
  log_parallel_status: true

Output shows worker utilization:

[PARALLEL] Active: 3/10 workers  โ† Only 30% utilized!
[PARALLEL] Queue: 0 tasks remaining
[PARALLEL] Suggestion: Reduce worker count to 5

Scenario 3: Data Loading Issues

debug:
  verbose: true

Output reveals the problem:

[DEBUG] Loading: data/input.jsonl
[DEBUG] Line 1: Valid โœ“
[DEBUG] Line 2: Valid โœ“
[DEBUG] Line 3: ERROR - First message not from user
[DEBUG] Line 4: Valid โœ“
[DEBUG] Loaded: 3 valid, 1 invalid

๐Ÿ“– Conversation Extension Pipeline - Complete Guide

Overview

The Conversation Extension Pipeline intelligently transforms base conversations into rich multi-turn dialogues. It can handle both single-turn questions and extend existing multi-turn conversations.

Key Features

  • โœ… Smart Extension - Continues from existing conversations based on last role
  • โœ… Flexible Input - Handles single-turn or multi-turn base data
  • โœ… Provider Mix - Use different AI providers for user and assistant
  • โœ… Multi-Tenant - Complete workspace isolation
  • โœ… Configurable - Full control over generation behavior

Configuration Options

Extension Modes

Smart Mode (Default)

generation:
  extension_mode: "smart"
  • Single-turn input โ†’ Generate new conversation from scratch
  • Multi-turn (user last) โ†’ Add 1 assistant response, then continue
  • Multi-turn (assistant last) โ†’ Add user + assistant, then continue
  • Invalid patterns โ†’ Skip row entirely

Legacy Mode

generation:
  extension_mode: "legacy"
  • Always extracts first user message only (original behavior)

Turn Calculation

Additional Mode (Default) - Add NEW turns on top of existing

generation:
  turn_calculation: "additional"  # Add 3-8 NEW turns

Total Mode - Keep total within range (never removes existing)

generation:
  turn_calculation: "total"  # Total should be 3-8 turns

Complete Configuration

# Workspace isolation (optional)
workspace_id: "user_123"

# AI Providers - Smart defaults applied!
providers:
  user_followup:
    name: "ultrasafe"
    api_key: "${ULTRASAFE_API_KEY}"
    # Defaults: usf-mini, 0.7 temp, 4096 tokens
    max_tokens: 2048              # Override default if needed
  
  assistant_response:
    name: "ultrasafe"
    api_key: "${ULTRASAFE_API_KEY}"
    # Defaults: usf-mini, 0.7 temp, 4096 tokens
    max_tokens: 8192              # Override for detailed responses

# Generation Settings
generation:
  num_conversations: 100
  turn_range:
    min: 3
    max: 8
  parallel_workers: 10
  
  # Extension behavior
  extension_mode: "smart"        # "smart" | "legacy"
  skip_invalid: true             # Skip invalid patterns
  turn_calculation: "additional" # "additional" | "total"

# Input Data
base_data:
  enabled: true
  source_type: "file"
  file_path: "base_data.jsonl"
  format: "conversations"
  shuffle: false

# Output Storage
storage:
  type: "jsonl"
  output_file: "output.jsonl"
  partial_file: "partial.jsonl"
  failed_file: "failed.jsonl"

# Checkpoint/Resume (recommended for large datasets)
checkpoint:
  enabled: true
  checkpoint_file: "checkpoint.json"
  auto_save_frequency: 10
  validate_input_hash: true
  resume_mode: "auto"

# System Messages (optional)
system_messages:
  add_if_missing:
    enabled: true
    content: "You are a helpful assistant. Current datetime: {current_datetime}"

# DateTime (optional)
datetime_config:
  enabled: true
  timezone: "UTC"
  format: "%Y-%m-%d %H:%M:%S"
  range:
    start_date: "2024-01-01"
    end_date: "2024-12-31"

Input Data Formats

Valid Patterns

Single-turn โœ…

{"conversations": [{"role": "user", "content": "How do I learn Python?"}]}

Multi-turn (user last) โœ…

{
  "conversations": [
    {"role": "user", "content": "How do I learn Python?"},
    {"role": "assistant", "content": "Start with basics..."},
    {"role": "user", "content": "What resources?"}
  ]
}

Multi-turn (assistant last) โœ…

{
  "conversations": [
    {"role": "user", "content": "How do I learn Python?"},
    {"role": "assistant", "content": "Start with basics..."}
  ]
}

Invalid Patterns (Skipped)

โŒ First message not user

{"conversations": [{"role": "assistant", "content": "Hello"}]}

โŒ Empty conversations

{"conversations": []}

Programmatic Usage

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')
    .set_generation(
        num_conversations=100,
        turn_range=(3, 8),
        parallel_workers=10,
        extension_mode='smart',      # Handle multi-turn intelligently
        skip_invalid=True,            # Skip invalid patterns
        turn_calculation='additional' # Add new turns (default)
    )
    .set_data_source('file', file_path='base_data.jsonl')
    .set_storage('jsonl', output_file='output.jsonl')
    .set_checkpoint(enabled=True, auto_save_frequency=10)  # Enable checkpoint
    .build()
)

pipeline = ConversationExtensionPipeline(config)
pipeline.run()  # Automatically resumes if interrupted!

Turn Calculation Examples

Additional Mode (Default)

Existing: 2 turns
Config: turn_range = (3, 8)
Result: Add 3-8 NEW turns โ†’ Total: 5-10 turns

Total Mode

Existing: 2 turns
Config: turn_range = (3, 8)
Result: Add 1-6 turns โ†’ Total: 3-8 turns

Existing: 10 turns (already > max)
Config: turn_range = (3, 8)
Result: Add 0 turns โ†’ Keep 10 turns (never remove)

Best Practices

Provider Selection

  • Use better models for assistant (claude-3-5-sonnet, gpt-4-turbo)
  • Use cheaper models for user followups (usf-mini, gpt-3.5-turbo)

Turn Range

  • Quick exchanges: (2, 4)
  • In-depth: (5, 10)
  • Balanced: (3, 8) โœ…

Parallel Workers

  • Conservative: 5 (avoid rate limits)
  • Balanced: 10 โœ…
  • Aggressive: 20 (watch for rate limits)

Troubleshooting

Issue: Empty output

  • Check input data format (first message must be user)
  • Set skip_invalid: false to see errors

Issue: Rate limits

  • Reduce parallel_workers
  • Check provider API limits
  • Enable checkpoint to preserve progress: checkpoint.enabled: true

Issue: Pipeline interrupted

  • โœ… Don't worry! Progress is automatically saved if checkpoint enabled
  • Simply run again - it will resume from where it stopped
  • Check checkpoint.json for current state

Issue: Low quality

  • Increase temperature (0.8-0.9)
  • Use better models
  • Add custom prompts and system messages

Issue: Duplicate conversations in output

  • Checkpoint system prevents this automatically
  • Both partial and complete saved to same file with status field
  • Filter completed: jq 'select(.status == "completed")' output.jsonl

License

MIT License - Ultrasafe AI ยฉ 2024


About Ultrasafe AI

Enterprise-grade AI tools with focus on safety and performance.


Made with โค๏ธ by Ultrasafe AI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omnigen_usf-0.1.2.tar.gz (149.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omnigen_usf-0.1.2-py3-none-any.whl (120.0 kB view details)

Uploaded Python 3

File details

Details for the file omnigen_usf-0.1.2.tar.gz.

File metadata

  • Download URL: omnigen_usf-0.1.2.tar.gz
  • Upload date:
  • Size: 149.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for omnigen_usf-0.1.2.tar.gz
Algorithm Hash digest
SHA256 eb14bda02031d122200a57bdea06cfcabfb85703e1741f8180ced27aebdaea73
MD5 2fad31588fcf9a4958dea9bb919f3195
BLAKE2b-256 e94a347d63298b3c3db2869c69089fc16d2e459bca7234a1cf3a0670b4bd27e7

See more details on using hashes here.

File details

Details for the file omnigen_usf-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: omnigen_usf-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 120.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for omnigen_usf-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 da398fbdd191b6c4a4e96849a0dcb857989783ec7e91a091a86b3f64a2de9535
MD5 839926d01e5501843c6b459d9dda3e4b
BLAKE2b-256 93782dc6da12828943b6b4f7142d67eb0ec0861dcd2746f03a5ed92ae587f66e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page