Skip to main content

Enterprise-Grade Synthetic Data Generation

Project description

OmniGen 🚀

Generate synthetic data at scale using an enterprise-ready framework with full customizable configuration, security, and ease of use

Built by Ultrasafe AI for production environments.


What is OmniGen?

OmniGen is an enterprise-grade framework for generating synthetic datasets at scale—from scratch or from base data. Generate trillions of tokens and billions of samples across multiple modalities:

🎯 Data Types Supported

  • 💬 Conversational Data - Single-turn to multi-turn dialogues
  • 🤖 Agentic Datasets - Tool use, function calling, multi-step reasoning
  • 🎨 Multimodal Datasets - Text, images, audio, video combinations
  • 🖼️ Images - Synthetic image generation and editing
  • 🎵 Audio - Speech, music, sound effects
  • 🎬 Video - Synthetic video sequences

🎓 Use Cases

  • Fine-Tuning - Instruction following, task-specific models
  • Supervised Fine-Tuning (SFT) - High-quality labeled datasets
  • Offline Reinforcement Learning - Preference datasets with rewards
  • Online Reinforcement Learning - Ground truth with reward checking scripts
  • Pre-Training - Large-scale corpus generation
  • Machine Learning - Training data for any ML task

🏗️ Why OmniGen?

  • Enterprise-Ready - Built for production at scale
  • Fully Customizable - Configure every aspect of generation
  • Secure - Complete isolation, no data mixing
  • Easy - Simple API, clear examples
  • Modular - Independent pipelines for different data types

🚀 Currently Available Pipeline

conversation_extension - Extend Single-Turn to Multi-Turn Conversations

Turn your base questions into rich multi-turn dialogues. This is just the first pipeline—more coming soon!


Why OmniGen?

Simple - One command to generate thousands of conversations ✅ Scalable - Parallel processing for fast generation ✅ Flexible - Mix different AI providers (OpenAI, Anthropic, Ultrasafe AI) ✅ Production Ready - Built for SaaS platforms with multi-tenant support ✅ Fault Tolerant - Checkpoint/resume system prevents data loss from any interruption


Quick Start

1. Install

pip install omnigen

2. Prepare Base Data

Create a file base_data.jsonl with your starting questions:

{"conversations": [{"role": "user", "content": "How do I learn Python?"}]}
{"conversations": [{"role": "user", "content": "What is machine learning?"}]}
{"conversations": [{"role": "user", "content": "Explain neural networks"}]}

3. Generate Conversations

You can configure the pipeline in two ways:

Option A: Using YAML Configuration File

Create a config.yaml file:

# NEW: Minimal config with smart defaults!
providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
    # Defaults: usf-mini, 0.7 temp, 4096 tokens ✓
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
    # Defaults: usf-mini, 0.7 temp, 4096 tokens ✓

generation:
  num_conversations: 100
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: base_data.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

Then load and run:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

# Load configuration from YAML file
config = ConversationExtensionConfig.from_yaml('config.yaml')

# Run the pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

Option B: Using Programmatic Configuration (Python)

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

# Configure the pipeline programmatically
config = (ConversationExtensionConfigBuilder()
    # User followup generator - minimal config!
    .add_provider(
        role='user_followup',
        name='ultrasafe',
        api_key='your-api-key'
        # Defaults: usf-mini, 0.7 temp, 4096 tokens ✓
    )
    # Assistant response generator
    .add_provider(
        role='assistant_response',
        name='ultrasafe',
        api_key='your-api-key'
        # Defaults: usf-mini, 0.7 temp, 4096 tokens ✓
    )
    # Generation settings
    .set_generation(
        num_conversations=100,
        turn_range=(3, 8)  # 3-8 turns per conversation
    )
    # Input data
    .set_data_source(
        source_type='file',
        file_path='base_data.jsonl'
    )
    # Output
    .set_storage(
        type='jsonl',
        output_file='output.jsonl'
    )
    .build()
)

# Run the pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

4. Get Results

Your generated conversations will be in output.jsonl:

{
  "id": 0,
  "conversations": [
    {"role": "user", "content": "How do I learn Python?"},
    {"role": "assistant", "content": "Great choice! Start with the basics..."},
    {"role": "user", "content": "What resources do you recommend?"},
    {"role": "assistant", "content": "I recommend these resources..."},
    {"role": "user", "content": "How long will it take?"},
    {"role": "assistant", "content": "With consistent practice..."}
  ],
  "num_turns": 3,
  "success": true
}

Supported AI Providers

NEW: Smart defaults automatically applied! Only specify name and api_key - model, temperature, and max_tokens use optimized defaults.

Provider Default Model Default Temp Default Tokens
Ultrasafe AI usf-mini 0.7 4096
OpenAI gpt-4-turbo 0.7 4096
Anthropic claude-3-5-sonnet-20241022 0.7 4096
OpenRouter openai/gpt-4-turbo 0.7 4096

Override defaults as needed:

providers:
  user_followup:
    name: openai
    api_key: ${OPENAI_API_KEY}
    temperature: 0.9  # Override only what you need

Mix Different Providers

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'openai', api_key, 'gpt-4-turbo')
    .add_provider('assistant_response', 'anthropic', api_key, 'claude-3-5-sonnet')
    # ... rest of config
    .build()
)

Advanced Features

🔄 Checkpoint & Resume System

NEW! Automatic checkpoint/resume functionality prevents data loss and duplication:

# Enable checkpoint/resume in your config
checkpoint:
  enabled: true
  checkpoint_file: "workspaces/my_project/checkpoint.json"
  auto_save_frequency: 10  # Save every 10 conversations
  validate_input_hash: true
  resume_mode: "auto"

Key Features:

  • Zero Data Loss - Progress saved automatically at intervals
  • No Duplicates - Hybrid tracking (position + content hash)
  • Partial Resume - Continue incomplete conversations
  • Handles All Interruptions - Manual stops, errors, rate limits, crashes

Quick Start:

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')
    .set_generation(num_conversations=1000, turn_range=(3, 8))
    .set_data_source('file', file_path='input.jsonl')
    .set_storage('jsonl', output_file='output.jsonl')
    .set_checkpoint(
        enabled=True,
        auto_save_frequency=10,
        resume_mode='auto'
    )
    .build()
)

pipeline = ConversationExtensionPipeline(config)
pipeline.run()  # Can interrupt (Ctrl+C) and resume anytime!

Output Files:

  • Both partial and complete conversations saved to same output file with status field
  • Status values: "completed", "partial", or "failed"
  • Easy to filter: jq 'select(.status == "completed")' output.jsonl

📖 Complete Checkpoint/Resume Documentation →


Multi-Tenant SaaS Support

Perfect for platforms serving multiple users concurrently:

# Each user gets isolated workspace
workspace_id = f"user_{user_id}_session_{session_id}"

config = (ConversationExtensionConfigBuilder(workspace_id=workspace_id)
    .add_provider('user_followup', 'ultrasafe', shared_api_key, 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', shared_api_key, 'usf-mini')
    .set_storage('jsonl', output_file='output.jsonl')  # Auto-isolated
    .build()
)

# Storage automatically goes to: workspaces/{workspace_id}/output.jsonl

Parallel Dataset Generation

from concurrent.futures import ProcessPoolExecutor

def process_dataset(input_file, output_file):
    config = (ConversationExtensionConfigBuilder()
        .add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')
        .add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')
        .set_data_source('file', file_path=input_file)
        .set_storage('jsonl', output_file=output_file)
        .build()
    )
    ConversationExtensionPipeline(config).run()

# Process 3 datasets in parallel
with ProcessPoolExecutor(max_workers=3) as executor:
    executor.submit(process_dataset, 'data1.jsonl', 'out1.jsonl')
    executor.submit(process_dataset, 'data2.jsonl', 'out2.jsonl')
    executor.submit(process_dataset, 'data3.jsonl', 'out3.jsonl')

📝 Configuration Methods

OmniGen supports multiple ways to configure your pipeline. Choose the method that best fits your workflow:

Method 1: YAML Configuration File ⭐ (Recommended)

The most flexible and maintainable approach:

Step 1: Create config.yaml

# NEW: Minimal configuration with smart defaults
providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}  # Only name and API key required!
    # Defaults: usf-mini, 0.7 temp, 4096 tokens ✓
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
    # Defaults: usf-mini, 0.7 temp, 4096 tokens ✓

generation:
  num_conversations: 100
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: base_data.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

Step 2: Load and run

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

# Load from YAML file
config = ConversationExtensionConfig.from_yaml('config.yaml')

# Run pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

✅ Benefits:

  • Easy to version control and share
  • Environment variable support with ${VAR_NAME}
  • No code changes needed for config updates
  • Clear, readable configuration

Method 2: Programmatic Configuration (Python)

Build configuration directly in code:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')
    .set_generation(num_conversations=100, turn_range=(3, 8))
    .set_data_source('file', file_path='base_data.jsonl')
    .set_storage('jsonl', output_file='output.jsonl')
    .build()
)

pipeline = ConversationExtensionPipeline(config)
pipeline.run()

✅ Benefits:

  • Type safety and IDE autocomplete
  • Dynamic configuration based on runtime
  • No external files needed
  • Easy application integration

Method 3: Dictionary Configuration

Create from Python dictionary:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

config_dict = {
    'providers': {
        'user_followup': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'},
        'assistant_response': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'}
    },
    'generation': {'num_conversations': 100, 'turn_range': {'min': 3, 'max': 8}},
    'base_data': {'source_type': 'file', 'file_path': 'base_data.jsonl'},
    'storage': {'type': 'jsonl', 'output_file': 'output.jsonl'}
}

config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

✅ Benefits:

  • Load from JSON files or APIs
  • Easy programmatic modification
  • Flexible for dynamic scenarios

Method 4: Hybrid Approach

Combine YAML with programmatic overrides:

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfig,
    ConversationExtensionPipeline
)

# Load base config from YAML
config = ConversationExtensionConfig.from_yaml('base_config.yaml')

# Modify specific settings
config_dict = config.to_dict()
config_dict['generation']['num_conversations'] = 500  # Override
config_dict['storage']['output_file'] = f'output_{user_id}.jsonl'  # Dynamic

# Rebuild and run
config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()

✅ Benefits:

  • Base configuration in YAML
  • Runtime customization in code
  • Best of both worlds

Environment Variables in YAML

Use ${VARIABLE_NAME} syntax:

providers:
  user_followup:
    name: ${PROVIDER_NAME}           # From environment
    api_key: ${ULTRASAFE_API_KEY}    # From environment
    model: ${USER_MODEL}             # From environment

base_data:
  file_path: ${INPUT_PATH}           # From environment

storage:
  output_file: ${OUTPUT_PATH}        # From environment

Set variables:

export PROVIDER_NAME="ultrasafe"
export ULTRASAFE_API_KEY="your-key"
export USER_MODEL="usf-mini"
export INPUT_PATH="data/input.jsonl"
export OUTPUT_PATH="data/output.jsonl"

# Then run
python your_script.py

CLI Usage with Config File

Run directly from command line:

# Using YAML config
omnigen conversation-extension --config config.yaml

# With overrides
omnigen conversation-extension \
  --config config.yaml \
  --num-conversations 500 \
  --output custom_output.jsonl


📖 Complete Configuration Reference

All Configuration Options Explained

Below is a comprehensive YAML configuration showing ALL available options with detailed explanations:

# ==============================================================================
# WORKSPACE ISOLATION (Optional)
# ==============================================================================
# Unique ID for multi-tenant environments - auto-isolates all output files
workspace_id: "user_123_session_abc"

# ==============================================================================
# PROVIDERS - AI Model Configuration
# ==============================================================================
# Configure different AI providers for each role
# Each role can use a different provider/model combination

providers:
  # Provider for generating user follow-up questions
  user_followup:
    name: ultrasafe              # Required: ultrasafe, openai, anthropic, openrouter
    api_key: ${API_KEY}          # Required: Use env var ${VAR_NAME} or direct key
    
    # Optional - Smart defaults applied if not specified:
    model: usf-mini              # Default varies by provider
    temperature: 0.7             # Default: 0.7
    max_tokens: 2048             # Default: 4096 (overridden here)
    timeout: 300                 # Default: 300
    max_retries: 5               # Default: 5
    retry_delay: 2               # Default: 2
  
  # Provider for generating assistant responses
  assistant_response:
    name: ultrasafe              # Can use different provider than user_followup
    api_key: ${API_KEY}          # Only name and api_key are required!
    max_tokens: 8192             # Override default (4096) for detailed responses
    # model, temperature, timeout, retries use defaults ✓

# PROVIDER OPTIONS:
# ----------------
# ultrasafe:
#   models: usf-mini, usf-max
#
# openai:
#   models: gpt-4-turbo, gpt-4, gpt-3.5-turbo, gpt-4o, gpt-4o-mini
#
# anthropic:
#   models: claude-3-5-sonnet-20241022, claude-3-opus-20240229,
#           claude-3-sonnet-20240229, claude-3-haiku-20240307
#
# openrouter:
#   models: Any OpenRouter supported model
#   base_url: https://openrouter.ai/api/v1 (optional)

# ==============================================================================
# GENERATION SETTINGS
# ==============================================================================
generation:
  num_conversations: 100           # Total conversations to generate
                                   # Use 0 or omit to process ALL available conversations
  
  turn_range:                      # Number of turns per conversation
    min: 3                         # Minimum turns
    max: 8                         # Maximum turns
  
  parallel_workers: 10             # Concurrent workers (balance speed vs rate limits)
  
  # Extension behavior for multi-turn input
  extension_mode: "smart"          # Options: "smart" | "legacy"
  # - smart: Intelligently handle multi-turn conversations
  # - legacy: Always extract first user message only
  
  skip_invalid: true               # Skip invalid patterns (recommended: true)
  
  # Turn calculation method
  turn_calculation: "additional"   # Options: "additional" | "total"
  # - additional: Add NEW turns on top of existing (default)
  # - total: Keep total turns within range (never removes existing)

# ==============================================================================
# DATA SOURCE CONFIGURATION
# ==============================================================================
base_data:
  enabled: true                    # Enable base data loading
  
  # OPTION 1: Local File
  source_type: file                # Use local JSONL/JSON file
  file_path: data/input.jsonl      # Path to file
  format: conversations            # JSON key containing conversation array
  shuffle: false                   # Shuffle data before processing
  
  # OPTION 2: HuggingFace Dataset
  # source_type: huggingface       # Use HuggingFace dataset
  # hf_dataset: username/dataset   # HuggingFace dataset path
  # hf_split: train                # Dataset split: train, test, validation
  # hf_token: ${HF_TOKEN}          # HuggingFace API token (if private)
  # hf_streaming: false            # Stream dataset (for large datasets)
  # format: conversations          # Field name in dataset
  # shuffle: true                  # Shuffle after loading

# ==============================================================================
# STORAGE CONFIGURATION
# ==============================================================================
storage:
  type: jsonl                      # Options: jsonl | mongodb
  
  # JSONL Storage (Default)
  output_file: output.jsonl        # Successful conversations
  partial_file: partial.jsonl      # Partial/incomplete conversations (legacy)
  failed_file: failed.jsonl        # Failed conversations
  
  # MongoDB Storage (Alternative)
  # type: mongodb
  # mongodb:
  #   connection_string: mongodb://localhost:27017
  #   database: omnigen
  #   collection: conversations
  #   output_collection: output          # Successful
  #   partial_collection: partial        # Partial
  #   failed_collection: failed          # Failed

# ==============================================================================
# CHECKPOINT/RESUME CONFIGURATION (NEW!)
# ==============================================================================
checkpoint:
  enabled: true                    # Enable automatic checkpoint/resume
  checkpoint_file: "workspaces/{workspace_id}/checkpoint.json"
  auto_save_frequency: 10          # Save checkpoint every N conversations
  validate_input_hash: true        # Verify input file hasn't changed on resume
  resume_mode: "auto"              # Options: "auto" | "manual" | "fresh"
  
  # Features:
  # - Zero data loss from interruptions
  # - No duplicate processing
  # - Partial conversation resume
  # - Handles: Ctrl+C, errors, rate limits, crashes

# ==============================================================================
# DATETIME CONFIGURATION (Optional)
# ==============================================================================
datetime_config:
  enabled: true                    # Enable datetime generation
  mode: random_from_range          # Options: random_from_range | current | fixed
  timezone: UTC                    # Timezone (UTC, America/New_York, Asia/Dubai, etc.)
  format: "%Y-%m-%d %H:%M:%S"      # Python strftime format
  
  # For random_from_range mode
  range:
    start: "2024-01-01 00:00:00"   # Start datetime
    end: "2024-12-31 23:59:59"     # End datetime
  
  # For fixed mode
  # fixed_datetime: "2024-06-15 12:00:00"

# ==============================================================================
# SYSTEM MESSAGES (Optional)
# ==============================================================================
system_messages:
  # Prepend system message to every conversation
  prepend_always:
    enabled: true
    content: "You are a helpful AI assistant. Current time: {current_datetime} ({timezone})."
  
  # Append system message to every conversation
  append_always:
    enabled: false
    content: "Remember to be concise and helpful."
  
  # Add system message only if none exists
  add_if_missing:
    enabled: false
    content: "You are an AI assistant."

# Available variables in system messages:
# - {current_datetime}: Generated datetime
# - {timezone}: Configured timezone
# - {workspace_id}: Current workspace ID

# ==============================================================================
# CUSTOM PROMPTS (Optional)
# ==============================================================================
prompts:
  # Custom prompt for user follow-up generation
  followup_question: |
    ## Your Task
    Generate an intelligent follow-up user question based on conversation history.
    
    ### CONVERSATION HISTORY:
    {history}
    
    ### INSTRUCTIONS:
    - Generate a meaningful follow-up question
    - Be conversational and natural
    - Vary your phrasing and tone
    - Build on the assistant's last response
    
    Return your follow-up question wrapped in XML tags:
    <user>Your follow-up question here</user>
  
  # Custom prompt for assistant response generation
  # assistant_response: |
  #   Your custom assistant response prompt here...

# ==============================================================================
# DEBUG OPTIONS (Optional)
# ==============================================================================
debug:
  log_api_timing: true             # Log API call timings
  log_parallel_status: true        # Log parallel worker status
  verbose: false                   # Verbose logging

Quick Configuration Examples

Example 1: Minimal Configuration (NEW!)

# Only specify what's required - defaults applied automatically
providers:
  user_followup:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}
  assistant_response:
    name: ultrasafe
    api_key: ${ULTRASAFE_API_KEY}

generation:
  num_conversations: 100
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: input.jsonl

storage:
  type: jsonl
  output_file: output.jsonl

# Enable checkpoint/resume (NEW!)
checkpoint:
  enabled: true
  auto_save_frequency: 10

Example 2: HuggingFace Dataset Input

providers:
  user_followup:
    name: openai
    api_key: ${OPENAI_API_KEY}
    model: gpt-4-turbo
  assistant_response:
    name: anthropic
    api_key: ${ANTHROPIC_API_KEY}
    model: claude-3-5-sonnet-20241022

generation:
  num_conversations: 1000
  turn_range: {min: 5, max: 10}
  parallel_workers: 20

base_data:
  source_type: huggingface
  hf_dataset: username/my-dataset
  hf_split: train
  hf_token: ${HF_TOKEN}
  format: conversations
  shuffle: true

storage:
  type: jsonl
  output_file: output.jsonl

# Checkpoint/resume for fault tolerance
checkpoint:
  enabled: true
  checkpoint_file: "checkpoint.json"
  auto_save_frequency: 50
  validate_input_hash: true

Example 3: Mixed Providers with MongoDB

providers:
  user_followup:
    name: openai
    api_key: ${OPENAI_API_KEY}
    model: gpt-3.5-turbo
    temperature: 0.8
  assistant_response:
    name: anthropic
    api_key: ${ANTHROPIC_API_KEY}
    model: claude-3-5-sonnet-20241022
    temperature: 0.7

generation:
  num_conversations: 500
  turn_range: {min: 3, max: 8}

base_data:
  source_type: file
  file_path: questions.jsonl

storage:
  type: mongodb
  mongodb:
    connection_string: mongodb://localhost:27017
    database: omnigen
    collection: conversations

# Checkpoint works with any storage type
checkpoint:
  enabled: true
  auto_save_frequency: 25

Example 4: Programmatic Configuration (Python)

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

# Build configuration programmatically
config = (ConversationExtensionConfigBuilder()
    # Workspace isolation
    .set_workspace_id("user_123_session_abc")
    
    # Providers
    .add_provider(
        role='user_followup',
        name='ultrasafe',
        api_key='your-api-key',
        model='usf-mini',
        temperature=0.7,
        max_tokens=2048
    )
    .add_provider(
        role='assistant_response',
        name='ultrasafe',
        api_key='your-api-key',
        model='usf-mini',
        temperature=0.7,
        max_tokens=8192
    )
    
    # Generation settings
    .set_generation(
        num_conversations=100,  # Or use 0/None to process ALL available
        turn_range=(3, 8),
        parallel_workers=10,
        extension_mode='smart',
        skip_invalid=True,
        turn_calculation='additional'
    )
    
    # Data source - Local file
    .set_data_source(
        source_type='file',
        file_path='input.jsonl',
        format='conversations',
        shuffle=False
    )
    
    # Data source - HuggingFace (alternative)
    # .set_data_source(
    #     source_type='huggingface',
    #     hf_dataset='username/dataset',
    #     hf_split='train',
    #     hf_token='your-token',
    #     format='conversations',
    #     shuffle=True
    # )
    
    # Storage
    .set_storage(
        type='jsonl',
        output_file='output.jsonl',
        partial_file='partial.jsonl',
        failed_file='failed.jsonl'
    )
    
    # Custom prompts (optional)
    .set_prompts(
        followup_question="Your custom prompt here with {history}"
    )
    
    .build()
)

# Run pipeline with checkpoint/resume support
pipeline = ConversationExtensionPipeline(config)
pipeline.run()  # Can be interrupted and resumed automatically!

🔧 Advanced Configuration Options - Deep Dive

System Messages Configuration

System messages allow you to inject context or instructions into conversations. You have three modes of operation:

1. Prepend Always

Add a system message at the start of every conversation:

system_messages:
  prepend_always:
    enabled: true
    content: "You are a helpful AI assistant. Current time: {current_datetime} ({timezone})."

Use Cases:

  • Set assistant personality/role
  • Provide real-time context (date, time)
  • Add global instructions

2. Append Always

Add a system message at the end of every conversation:

system_messages:
  append_always:
    enabled: true
    content: "Remember to be concise and provide sources when possible."

Use Cases:

  • Add final reminders
  • Set response style constraints
  • Add quality guidelines

3. Add If Missing

Add a system message only if the conversation doesn't already have one:

system_messages:
  add_if_missing:
    enabled: true
    content: "You are an AI assistant."

Use Cases:

  • Ensure all conversations have baseline instructions
  • Fallback when base data lacks system messages

Available Template Variables

You can use these variables in your system message content:

Variable Description Example Output
{current_datetime} Generated datetime 2024-06-15 14:30:00
{timezone} Configured timezone UTC or America/New_York
{workspace_id} Current workspace ID user_123_session_abc

Complete Example - All Three Modes

system_messages:
  prepend_always:
    enabled: true
    content: |
      You are a knowledgeable AI assistant.
      Current datetime: {current_datetime} ({timezone})
      Workspace: {workspace_id}
  
  append_always:
    enabled: true
    content: |
      IMPORTANT REMINDERS:
      - Always cite sources when providing facts
      - Be concise but thorough
      - Ask clarifying questions when needed
  
  add_if_missing:
    enabled: true
    content: "You are a helpful assistant."

Note: If both prepend_always and add_if_missing are enabled, prepend_always takes precedence.

Programmatic Usage

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

config = (ConversationExtensionConfigBuilder()
    # ... other config ...
    .build()
)

# Add system messages to config dict
config_dict = config.to_dict()
config_dict['system_messages'] = {
    'prepend_always': {
        'enabled': True,
        'content': 'You are a helpful assistant. Time: {current_datetime}'
    }
}

DateTime Configuration

Control how datetime values are generated and injected into system messages and conversations.

Mode 1: Random from Range (Default)

Generate random datetimes within a specified range:

datetime_config:
  enabled: true
  mode: random_from_range
  timezone: UTC                        # Any valid timezone
  format: "%Y-%m-%d %H:%M:%S"          # Python strftime format
  range:
    start: "2024-01-01 00:00:00"
    end: "2024-12-31 23:59:59"

Use Cases:

  • Training data with temporal diversity
  • Simulating historical conversations
  • Creating time-aware datasets

Common Timezones:

  • UTC - Coordinated Universal Time
  • America/New_York - Eastern Time
  • Europe/London - British Time
  • Asia/Dubai - Gulf Standard Time
  • Asia/Tokyo - Japan Standard Time

Mode 2: Current Time

Use actual current time when generating:

datetime_config:
  enabled: true
  mode: current
  timezone: America/New_York
  format: "%B %d, %Y at %I:%M %p"      # December 15, 2024 at 02:30 PM

Use Cases:

  • Real-time conversation simulation
  • Current events discussions
  • Live system demonstrations

Mode 3: Fixed DateTime

Use the same datetime for all conversations:

datetime_config:
  enabled: true
  mode: fixed
  timezone: UTC
  format: "%Y-%m-%d %H:%M:%S"
  fixed_datetime: "2024-06-15 12:00:00"

Use Cases:

  • Consistent training data
  • Specific time period simulation
  • Testing and debugging

Format String Examples

Common datetime formats using Python's strftime:

# ISO 8601 Format
format: "%Y-%m-%d %H:%M:%S"           # 2024-12-15 14:30:00

# Human-Readable
format: "%B %d, %Y at %I:%M %p"       # December 15, 2024 at 02:30 PM

# Date Only
format: "%Y-%m-%d"                     # 2024-12-15

# Time Only
format: "%H:%M:%S"                     # 14:30:00

# Custom Format
format: "%A, %B %d, %Y"               # Monday, December 15, 2024

Complete DateTime Example

datetime_config:
  enabled: true
  mode: random_from_range
  timezone: America/New_York
  format: "%A, %B %d, %Y at %I:%M %p %Z"
  range:
    start: "2024-01-01 09:00:00"      # Business hours only
    end: "2024-12-31 17:00:00"

# Then use in system messages:
system_messages:
  prepend_always:
    enabled: true
    content: |
      You are a business assistant.
      Current datetime: {current_datetime}
      Timezone: {timezone}

Output Example:

You are a business assistant.
Current datetime: Wednesday, June 15, 2024 at 02:30 PM EST
Timezone: America/New_York

Custom Prompts Configuration

Override default prompts to control how the AI generates follow-up questions and responses.

Default Prompts

OmniGen uses optimized default prompts, but you can customize them:

1. Follow-up Question Prompt (for user_followup role):

prompts:
  followup_question: |
    ## Your Task
    Generate an intelligent follow-up user question based on conversation history.
    
    ### CONVERSATION HISTORY:
    {history}
    
    ### INSTRUCTIONS:
    - Generate a meaningful follow-up question
    - Be conversational and natural
    - Vary your phrasing and tone
    - Build on the assistant's last response
    - Make the question specific to the conversation context
    
    Return your follow-up question wrapped in XML tags:
    <user>Your follow-up question here</user>

2. Assistant Response Prompt (for assistant_response role):

prompts:
  assistant_response: |
    ## Your Task
    Generate a helpful assistant response based on the conversation history.
    
    ### CONVERSATION HISTORY:
    {history}
    
    ### INSTRUCTIONS:
    - Provide accurate and helpful information
    - Be conversational and friendly
    - Reference previous context when relevant
    - Keep responses focused and concise
    
    Return your response wrapped in XML tags:
    <assistant>Your response here</assistant>

Available Template Variables

Variable Description Content
{history} Full conversation history All previous messages formatted as text

Custom Prompt Examples

Example 1: Technical Documentation Assistant

prompts:
  followup_question: |
    Generate a technical follow-up question from a developer's perspective.
    
    CONVERSATION:
    {history}
    
    Create a question that:
    - Asks about implementation details
    - Seeks code examples or best practices
    - Explores edge cases or potential issues
    
    <user>Your technical question</user>
  
  assistant_response: |
    Provide a detailed technical response with code examples.
    
    CONVERSATION:
    {history}
    
    Your response should:
    - Include code snippets when helpful
    - Explain technical concepts clearly
    - Provide links to documentation
    - Mention potential pitfalls
    
    <assistant>Your technical response</assistant>

Example 2: Customer Support Simulation

prompts:
  followup_question: |
    Simulate a customer asking for help.
    
    CONVERSATION:
    {history}
    
    Generate a question that:
    - Shows frustration or confusion (realistic)
    - Asks for clarification on previous response
    - Requests specific solutions or workarounds
    
    <user>Customer question</user>
  
  assistant_response: |
    Provide empathetic customer support.
    
    CONVERSATION:
    {history}
    
    Your response must:
    - Show empathy and understanding
    - Provide clear step-by-step solutions
    - Offer alternatives if applicable
    - End with "Is there anything else I can help with?"
    
    <assistant>Support response</assistant>

Example 3: Educational Tutor

prompts:
  followup_question: |
    Generate a student's follow-up question showing learning progression.
    
    CONTEXT:
    {history}
    
    The question should:
    - Build on what was just explained
    - Show either understanding or confusion
    - Ask for examples or clarification
    - Demonstrate curiosity about the topic
    
    <user>Student question</user>
  
  assistant_response: |
    Respond as a patient, knowledgeable tutor.
    
    CONTEXT:
    {history}
    
    Your response should:
    - Use simple, clear language
    - Provide concrete examples
    - Check for understanding
    - Encourage further questions
    
    <assistant>Tutor response</assistant>

Programmatic Prompt Configuration

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

custom_prompts = {
    'followup_question': """
        Generate a follow-up based on: {history}
        Make it specific and contextual.
        <user>question</user>
    """,
    'assistant_response': """
        Respond helpfully to: {history}
        Be clear and concise.
        <assistant>response</assistant>
    """
}

config = (ConversationExtensionConfigBuilder()
    # ... other config ...
    .set_prompts(**custom_prompts)
    .build()
)

Debug Configuration

Enable detailed logging and monitoring for troubleshooting and optimization.

Debug Options

debug:
  log_api_timing: true          # Log API call duration and performance
  log_parallel_status: true     # Log parallel worker status and progress
  verbose: false                # Enable verbose logging (all operations)

Option Details

1. API Timing Logs (log_api_timing: true)

Tracks API performance for each call:

[API TIMING] user_followup request took 1.23s
[API TIMING] assistant_response request took 2.45s
[API TIMING] Average response time: 1.84s

Use Cases:

  • Identify slow API providers
  • Optimize provider selection
  • Monitor rate limits
  • Debug timeout issues

2. Parallel Status Logs (log_parallel_status: true)

Shows worker activity in real-time:

[PARALLEL] Worker 1/10: Processing conversation 5
[PARALLEL] Worker 2/10: Processing conversation 6
[PARALLEL] Worker 3/10: Waiting for task...
[PARALLEL] Progress: 45/100 conversations complete (45%)
[PARALLEL] Active workers: 8/10 | Queue: 12 remaining

Use Cases:

  • Monitor parallel processing
  • Identify bottlenecks
  • Optimize worker count
  • Track progress in real-time

3. Verbose Logging (verbose: true)

Enables comprehensive logging of all operations:

[DEBUG] Loading base data from: data/input.jsonl
[DEBUG] Loaded 100 base conversations
[DEBUG] Initializing provider: ultrasafe (usf-mini)
[DEBUG] Starting parallel generation with 10 workers
[DEBUG] Conversation 1: Processing...
[DEBUG] Conversation 1: Generated 5 turns
[DEBUG] Conversation 1: Saved to output.jsonl
[DEBUG] Final stats: 95 success, 3 partial, 2 failed

Use Cases:

  • Development and testing
  • Debugging issues
  • Understanding pipeline flow
  • Troubleshooting data problems

Complete Debug Configuration

# Full debug mode for development
debug:
  log_api_timing: true
  log_parallel_status: true
  verbose: true

# Production mode (minimal logging)
debug:
  log_api_timing: false
  log_parallel_status: false
  verbose: false

# Performance monitoring mode
debug:
  log_api_timing: true      # Track API performance
  log_parallel_status: true # Monitor workers
  verbose: false            # Don't flood logs

Programmatic Debug Configuration

from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder

config = (ConversationExtensionConfigBuilder()
    # ... other config ...
    .build()
)

# Add debug settings
config_dict = config.to_dict()
config_dict['debug'] = {
    'log_api_timing': True,
    'log_parallel_status': True,
    'verbose': False
}

Debug Output Examples

Scenario 1: API Performance Issue

debug:
  log_api_timing: true

Output helps identify slow providers:

[API TIMING] openai/gpt-4-turbo: 0.8s
[API TIMING] anthropic/claude-3-5-sonnet: 3.2s  ← SLOW!
[API TIMING] ultrasafe/usf-mini: 0.5s

Scenario 2: Parallel Processing Bottleneck

debug:
  log_parallel_status: true

Output shows worker utilization:

[PARALLEL] Active: 3/10 workers  ← Only 30% utilized!
[PARALLEL] Queue: 0 tasks remaining
[PARALLEL] Suggestion: Reduce worker count to 5

Scenario 3: Data Loading Issues

debug:
  verbose: true

Output reveals the problem:

[DEBUG] Loading: data/input.jsonl
[DEBUG] Line 1: Valid ✓
[DEBUG] Line 2: Valid ✓
[DEBUG] Line 3: ERROR - First message not from user
[DEBUG] Line 4: Valid ✓
[DEBUG] Loaded: 3 valid, 1 invalid

📖 Conversation Extension Pipeline - Complete Guide

Overview

The Conversation Extension Pipeline intelligently transforms base conversations into rich multi-turn dialogues. It can handle both single-turn questions and extend existing multi-turn conversations.

Key Features

  • Smart Extension - Continues from existing conversations based on last role
  • Flexible Input - Handles single-turn or multi-turn base data
  • Provider Mix - Use different AI providers for user and assistant
  • Multi-Tenant - Complete workspace isolation
  • Configurable - Full control over generation behavior

Configuration Options

Extension Modes

Smart Mode (Default)

generation:
  extension_mode: "smart"
  • Single-turn input → Generate new conversation from scratch
  • Multi-turn (user last) → Add 1 assistant response, then continue
  • Multi-turn (assistant last) → Add user + assistant, then continue
  • Invalid patterns → Skip row entirely

Legacy Mode

generation:
  extension_mode: "legacy"
  • Always extracts first user message only (original behavior)

Turn Calculation

Additional Mode (Default) - Add NEW turns on top of existing

generation:
  turn_calculation: "additional"  # Add 3-8 NEW turns

Total Mode - Keep total within range (never removes existing)

generation:
  turn_calculation: "total"  # Total should be 3-8 turns

Complete Configuration

# Workspace isolation (optional)
workspace_id: "user_123"

# AI Providers - Smart defaults applied!
providers:
  user_followup:
    name: "ultrasafe"
    api_key: "${ULTRASAFE_API_KEY}"
    # Defaults: usf-mini, 0.7 temp, 4096 tokens
    max_tokens: 2048              # Override default if needed
  
  assistant_response:
    name: "ultrasafe"
    api_key: "${ULTRASAFE_API_KEY}"
    # Defaults: usf-mini, 0.7 temp, 4096 tokens
    max_tokens: 8192              # Override for detailed responses

# Generation Settings
generation:
  num_conversations: 100
  turn_range:
    min: 3
    max: 8
  parallel_workers: 10
  
  # Extension behavior
  extension_mode: "smart"        # "smart" | "legacy"
  skip_invalid: true             # Skip invalid patterns
  turn_calculation: "additional" # "additional" | "total"

# Input Data
base_data:
  enabled: true
  source_type: "file"
  file_path: "base_data.jsonl"
  format: "conversations"
  shuffle: false

# Output Storage
storage:
  type: "jsonl"
  output_file: "output.jsonl"
  partial_file: "partial.jsonl"
  failed_file: "failed.jsonl"

# Checkpoint/Resume (recommended for large datasets)
checkpoint:
  enabled: true
  checkpoint_file: "checkpoint.json"
  auto_save_frequency: 10
  validate_input_hash: true
  resume_mode: "auto"

# System Messages (optional)
system_messages:
  add_if_missing:
    enabled: true
    content: "You are a helpful assistant. Current datetime: {current_datetime}"

# DateTime (optional)
datetime_config:
  enabled: true
  timezone: "UTC"
  format: "%Y-%m-%d %H:%M:%S"
  range:
    start_date: "2024-01-01"
    end_date: "2024-12-31"

Input Data Formats

Valid Patterns

Single-turn

{"conversations": [{"role": "user", "content": "How do I learn Python?"}]}

Multi-turn (user last)

{
  "conversations": [
    {"role": "user", "content": "How do I learn Python?"},
    {"role": "assistant", "content": "Start with basics..."},
    {"role": "user", "content": "What resources?"}
  ]
}

Multi-turn (assistant last)

{
  "conversations": [
    {"role": "user", "content": "How do I learn Python?"},
    {"role": "assistant", "content": "Start with basics..."}
  ]
}

Invalid Patterns (Skipped)

❌ First message not user

{"conversations": [{"role": "assistant", "content": "Hello"}]}

❌ Empty conversations

{"conversations": []}

Programmatic Usage

from omnigen.pipelines.conversation_extension import (
    ConversationExtensionConfigBuilder,
    ConversationExtensionPipeline
)

config = (ConversationExtensionConfigBuilder()
    .add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')
    .add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')
    .set_generation(
        num_conversations=100,
        turn_range=(3, 8),
        parallel_workers=10,
        extension_mode='smart',      # Handle multi-turn intelligently
        skip_invalid=True,            # Skip invalid patterns
        turn_calculation='additional' # Add new turns (default)
    )
    .set_data_source('file', file_path='base_data.jsonl')
    .set_storage('jsonl', output_file='output.jsonl')
    .set_checkpoint(enabled=True, auto_save_frequency=10)  # Enable checkpoint
    .build()
)

pipeline = ConversationExtensionPipeline(config)
pipeline.run()  # Automatically resumes if interrupted!

Turn Calculation Examples

Additional Mode (Default)

Existing: 2 turns
Config: turn_range = (3, 8)
Result: Add 3-8 NEW turns → Total: 5-10 turns

Total Mode

Existing: 2 turns
Config: turn_range = (3, 8)
Result: Add 1-6 turns → Total: 3-8 turns

Existing: 10 turns (already > max)
Config: turn_range = (3, 8)
Result: Add 0 turns → Keep 10 turns (never remove)

Best Practices

Provider Selection

  • Use better models for assistant (claude-3-5-sonnet, gpt-4-turbo)
  • Use cheaper models for user followups (usf-mini, gpt-3.5-turbo)

Turn Range

  • Quick exchanges: (2, 4)
  • In-depth: (5, 10)
  • Balanced: (3, 8)

Parallel Workers

  • Conservative: 5 (avoid rate limits)
  • Balanced: 10
  • Aggressive: 20 (watch for rate limits)

Troubleshooting

Issue: Empty output

  • Check input data format (first message must be user)
  • Set skip_invalid: false to see errors

Issue: Rate limits

  • Reduce parallel_workers
  • Check provider API limits
  • Enable checkpoint to preserve progress: checkpoint.enabled: true

Issue: Pipeline interrupted

  • ✅ Don't worry! Progress is automatically saved if checkpoint enabled
  • Simply run again - it will resume from where it stopped
  • Check checkpoint.json for current state

Issue: Low quality

  • Increase temperature (0.8-0.9)
  • Use better models
  • Add custom prompts and system messages

Issue: Duplicate conversations in output

  • Checkpoint system prevents this automatically
  • Both partial and complete saved to same file with status field
  • Filter completed: jq 'select(.status == "completed")' output.jsonl

License

MIT License - Ultrasafe AI © 2024


About Ultrasafe AI

Enterprise-grade AI tools with focus on safety and performance.


Made with ❤️ by Ultrasafe AI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

omnigen_usf-0.0.1.post4.tar.gz (87.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

omnigen_usf-0.0.1.post4-py3-none-any.whl (56.3 kB view details)

Uploaded Python 3

File details

Details for the file omnigen_usf-0.0.1.post4.tar.gz.

File metadata

  • Download URL: omnigen_usf-0.0.1.post4.tar.gz
  • Upload date:
  • Size: 87.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.1

File hashes

Hashes for omnigen_usf-0.0.1.post4.tar.gz
Algorithm Hash digest
SHA256 193dc08dbd2c3f9af72bd0ee6cf6ef3cd5eea3ec30b861b9dbe99d9f8b63f663
MD5 aa1902b84331b660e05619072675aa19
BLAKE2b-256 fd503cadc10ca1a0db55a023fe9be7dc37e63d6aba47b656fcb7eb40228816fa

See more details on using hashes here.

File details

Details for the file omnigen_usf-0.0.1.post4-py3-none-any.whl.

File metadata

File hashes

Hashes for omnigen_usf-0.0.1.post4-py3-none-any.whl
Algorithm Hash digest
SHA256 c2434ba241ddf2ef3268f460185ca0bd7b9219465c7cf986c5b883945630d248
MD5 d78f7867c36762065675a749ca2afde1
BLAKE2b-256 23fdd285f633f7239646a9b62d54dd696480a0480d5bd1dc07e10f563ae2461c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page