Skip to main content

Production-ready REST API wrapper for Google Cloud Vertex AI batch embeddings

Project description

Vertex AI Batch Embeddings API

CI/CD Lint Tests License: MIT Python 3.9+

A REST API service for managing batch text embedding workflows on Google Cloud Vertex AI. It stages input payloads in Cloud Storage, initiates Vertex AI batch prediction jobs, and returns structured job metadata.

Capabilities

  • Production deployment support (Docker, Cloud Run, health checks)
  • API key authentication with configurable rate limiting
  • Real-time job metadata and status retrieval
  • Input validation with clear error responses
  • Cloud Storage integration for input staging and output retrieval
  • Performance optimizations: Gzip compression for faster uploads

Prerequisites

Before using this API, ensure you have:

1. Google Cloud Project Setup

  • A GCP project with billing enabled
  • The Vertex AI API enabled: gcloud services enable aiplatform.googleapis.com
  • The Cloud Storage API enabled: gcloud services enable storage-api.googleapis.com

2. Cloud Storage Buckets

Create two GCS buckets for input and output:

gsutil mb gs://your-project-embed-input
gsutil mb gs://your-project-embed-output

3. Local GCP Authentication

Authenticate with GCP locally:

gcloud auth application-default login

This creates credentials that the API will use to access GCP services.

4. Python Environment

  • Python 3.9 or later
  • pip or conda for package management

Quick Start

# Clone the repository
git clone https://github.com/scrrlt/vertex-batch-embeddings-api.git
cd vertex-batch-embeddings

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export GOOGLE_CLOUD_PROJECT=your-project-id
export GCS_EMBED_INPUT_BUCKET=your-input-bucket
export GCS_EMBED_OUTPUT_BUCKET=your-output-bucket
export API_KEY_SECRET=your-api-key

# Run locally
python run_api.py

# Or with Docker
docker build -t vertex-embeddings .
docker run -p 8080:8080 -e GOOGLE_CLOUD_PROJECT=... vertex-embeddings

API Usage

Submit Batch Job

curl -X POST http://localhost:8080/v1/embeddings/batch \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{
    "texts": ["Hello world", "How are you?"],
    "job_name": "my-embeddings-job",
    "webhook_url": "https://your-app.com/webhook"
  }'

Response:

{
  "job_name": "my-embeddings-job",
  "resource_name": "projects/.../locations/.../batchPredictionJobs/...",
  "input_uri": "gs://bucket/embeddings/inputs/instances_20231109.jsonl",
  "output_uri": "gs://bucket/embeddings/outputs/my-embeddings-job/",
  "status": "submitted",
  "text_count": 2
}

Check Job Status

curl http://localhost:8080/v1/embeddings/batch/my-embeddings-job/status \
  -H "X-API-Key: your-api-key"

Retrieve and Parse Embeddings Output

Once your job completes, retrieve the embeddings from Cloud Storage:

from google.cloud import storage
import json

def download_embeddings(project_id: str, bucket: str, job_name: str):
    """Download and parse embeddings from GCS."""
    client = storage.Client(project=project_id)
    bucket_obj = client.bucket(bucket)

    # List all prediction files for this job
    prefix = f"embeddings/outputs/{job_name}/"
    blobs = bucket_obj.list_blobs(prefix=prefix)

    embeddings = []
    for blob in blobs:
        if blob.name.endswith(".jsonl"):
            # Download and parse JSONL file
            content = blob.download_as_text()
            for line in content.strip().split('\n'):
                if line:
                    prediction = json.loads(line)
                    embeddings.append(prediction)

    return embeddings

# Usage
embeddings = download_embeddings(
    project_id="your-project",
    bucket="your-output-bucket",
    job_name="my-embeddings-job"
)

# Each embedding is a dict with:
# {
#   "predictions": [[0.123, 0.456, ...]]  # 768-dimensional vector
# }
print(f"Retrieved {len(embeddings)} embeddings")

Webhook Notifications

The API supports webhook notifications for job completion. When you submit a batch job with a webhook_url, you'll receive a POST request when the job finishes (success or failure).

Webhook Payload

{
  "event": "batch_embedding_job_completed",
  "job": {
    "job_name": "my-embeddings-job",
    "status": "JOB_STATE_SUCCEEDED",
    "resource_name": "projects/.../locations/.../batchPredictionJobs/...",
    "create_time": "2024-01-15T10:30:00Z",
    "start_time": "2024-01-15T10:31:00Z",
    "end_time": "2024-01-15T10:45:00Z",
    "output_uri": "gs://bucket/embeddings/outputs/my-embeddings-job/",
    "error_message": null
  },
  "timestamp": "2024-01-15T10:45:05Z"
}

Webhook Security

  • Webhooks are sent as HTTP POST requests with Content-Type: application/json
  • Implement authentication on your webhook endpoint to verify requests
  • The API does not retry failed webhook deliveries (implement your own retry logic if needed)

Usage Example

curl -X POST http://localhost:8080/v1/embeddings/batch \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{
    "texts": ["Hello world", "How are you?"],
    "job_name": "my-embeddings-job",
    "webhook_url": "https://your-app.com/webhook/endpoint"
  }'

Performance Optimizations

The API includes several optimizations to reduce processing time and costs for large datasets:

Compression

Enable gzip compression for faster uploads to Cloud Storage:

curl -X POST http://localhost:8080/v1/embeddings/batch \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{
    "texts": ["text1", "text2", "text3"],
    "compress_upload": true
  }'

Benefits:

  • 60-80% reduction in upload time for large text datasets
  • Lower Cloud Storage costs
  • Faster job startup times

Document Processing Workflow

Text Chunking Strategies

For optimal embedding quality, split documents into appropriately-sized chunks. Recommended parameters:

  • Chunk size: 500–1000 characters
  • Overlap: 100–200 characters (prevents context loss at boundaries)
  • Separators: Prioritize semantic boundaries (paragraphs, sentences, words)

Popular libraries for text chunking include LangChain, LlamaIndex, or NLTK. See the examples/ directory for implementation details.

Batch Submission

Submit document chunks for embedding:

curl -X POST http://localhost:8080/v1/embeddings/batch \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{
    "texts": ["chunk1", "chunk2", "chunk3"],
    "job_name": "document-embeddings-batch-1"
  }'

Embedding Retrieval

Once the batch job completes, retrieve embeddings from Cloud Storage:

from google.cloud import storage
import json

def retrieve_embeddings(project_id: str, bucket: str, job_name: str):
    """Retrieve embeddings from GCS output."""
    client = storage.Client(project=project_id)
    bucket_obj = client.bucket(bucket)
    prefix = f"embeddings/outputs/{job_name}/"

    embeddings = []
    for blob in bucket_obj.list_blobs(prefix=prefix):
        if blob.name.endswith(".jsonl"):
            content = blob.download_as_text()
            for line in content.strip().split('\n'):
                if line:
                    embeddings.append(json.loads(line))
    return embeddings

Output format: Each embedding is a 768-dimensional vector stored as {"predictions": [[vector]]}

Security Best Practices

Endpoint Protection

Secure your Cloud Run endpoint using IAM:

# Require authentication for the endpoint
gcloud run services update vertex-embeddings \
  --no-allow-unauthenticated \
  --region us-central1

# Grant access to specific service accounts
gcloud run services add-iam-policy-binding vertex-embeddings \
  --member=serviceAccount:your-service-account@your-project.iam.gserviceaccount.com \
  --role=roles/run.invoker \
  --region us-central1

API Key Management

  • Store API keys in GCP Secret Manager, not in code
  • Rotate keys regularly (recommended: every 90 days)
  • Use separate keys for different environments (dev, staging, prod)
  • Monitor API key usage via Cloud Logging
# Create a secret in Secret Manager
echo -n "your-api-key" | gcloud secrets create vertex-api-key --data-file=-

# Reference in Cloud Run
gcloud run deploy vertex-embeddings \
  --set-env-vars API_KEY_SECRET=$(gcloud secrets versions access latest --secret=vertex-api-key)

VPC Service Controls

For enhanced security, use VPC Service Controls to restrict data exfiltration:

  • Create a VPC perimeter around your GCP resources
  • Restrict API access to authorized networks only
  • Monitor and audit all API calls

Data Privacy

  • Embeddings are stored in your GCS buckets (not shared with Google)
  • Use GCS encryption at rest (default: Google-managed keys)
  • Consider customer-managed encryption keys (CMEK) for sensitive data
  • Enable audit logging for all GCS access

Environment Variables

Variable Required Default Description
GOOGLE_CLOUD_PROJECT Yes - GCP project ID
LOCATION No us-central1 GCP region
EMBEDDING_MODEL No text-embedding-004 Vertex AI model
GCS_EMBED_INPUT_BUCKET Yes - Input bucket for text data
GCS_EMBED_OUTPUT_BUCKET Yes - Output bucket for embeddings
API_KEY_SECRET Yes - API keys accepted by the service (comma-separated)
RATE_LIMIT_REQUESTS No 100 Requests per hour per API key
RATE_LIMIT_WINDOW No 3600 Rate limit window in seconds
REDIS_URL No - Redis URL for distributed rate limiting (optional)
MAX_TEXTS_PER_REQUEST No 1000 Maximum texts per request
MAX_TEXT_LENGTH No 10000 Maximum characters per text
ALLOWED_MODELS No text-embedding-004,text-embedding-preview-0815,text-multilingual-embedding-002 Comma-separated list of allowed models

Deployment

Cloud Run (Recommended)

Deploy to Google Cloud Run for serverless, auto-scaling execution:

gcloud run deploy vertex-embeddings \
  --source . \
  --platform managed \
  --region us-central1 \
  --set-env-vars "GOOGLE_CLOUD_PROJECT=your-project,API_KEY_SECRET=your-api-key"

Docker

Build and run locally or in any container environment:

docker build -t vertex-batch-embeddings:latest .
docker run -p 8080:8080 \
  -e GOOGLE_CLOUD_PROJECT=your-project \
  -e API_KEY_SECRET=your-api-key \
  vertex-batch-embeddings:latest

See Dockerfile for production-ready configuration with health checks and non-root user.

Cost Estimation

Vertex AI batch embeddings pricing depends on:

  • Model: Different models have different costs
  • Volume: Bulk discounts apply for large volumes
  • Region: Pricing varies by region

For current pricing details, see:

Rough Estimates (as of 2024):

  • text-embedding-004: ~$0.02 per 1M tokens
  • 1,000 texts (~500 tokens each) ≈ $0.01

Model Selection

Available Models

Model Dimensions Use Case Cost
text-embedding-004 768 General purpose, recommended Standard
text-embedding-preview-0815 768 Preview/experimental Standard
text-multilingual-embedding-002 768 Multilingual content Standard

Choosing a Model

  • General English text: Use text-embedding-004 (recommended)
  • Multilingual content: Use text-multilingual-embedding-002
  • Experimental features: Use text-embedding-preview-0815

To use a different model, pass it in the request:

curl -X POST http://localhost:8080/v1/embeddings/batch \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{
    "texts": ["Your text here"],
    "model": "text-multilingual-embedding-002"
  }'

Or set the default model via environment variable:

export EMBEDDING_MODEL=text-multilingual-embedding-002

Troubleshooting

Common Issues

Issue: "GOOGLE_CLOUD_PROJECT not set"

  • Solution: Set the environment variable: export GOOGLE_CLOUD_PROJECT=your-project-id
  • Verify: echo $GOOGLE_CLOUD_PROJECT

Issue: "Permission denied" when accessing GCS buckets

  • Solution: Ensure your GCP credentials have the necessary roles:
    • roles/storage.objectAdmin on both input and output buckets
    • roles/aiplatform.user for Vertex AI access
  • Verify: gcloud auth list and gcloud config get-value project

Issue: "Rate limit exceeded" errors

  • Solution: Increase RATE_LIMIT_REQUESTS or RATE_LIMIT_WINDOW
  • For production: Deploy Redis and set REDIS_URL for distributed rate limiting

Issue: "Out of memory" errors with large inputs

  • Solution: The API now uses streaming uploads. If you still encounter OOM:
    • Reduce batch size (fewer texts per request)
    • Reduce text length (shorter individual texts)
    • Deploy with more memory: gcloud run deploy ... --memory 2Gi

Issue: Job stuck in "QUEUED" state

  • Solution: This is normal for batch jobs. Check status periodically.
  • Typical duration: 5-30 minutes, depending on job size
  • Monitor via: gcloud ai batch-prediction-jobs list --region=us-central1

Issue: "Invalid API key" errors

  • Solution: Verify the API key is correct and matches API_KEY_SECRET
  • For multiple keys: Use comma-separated format: key1,key2,key3

Debugging

Enable debug logging:

export LOG_LEVEL=DEBUG
export FLASK_DEBUG=true
python -m src.api

Check Cloud Logging for errors:

gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=vertex-embeddings" \
  --limit 50 \
  --format json

Development

For information on setting up your development environment and contributing to the project, see:

Quick start for developers:

# Clone and setup
git clone https://github.com/scrrlt/vertex-batch-embeddings-api.git
cd vertex-batch-embeddings-api
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt

# Run tests
make test

# Run linters
make lint

# Auto-format code
make format

# Run locally
export FLASK_DEBUG=true
python run_api.py

# Run with coverage
python -m pytest tests/ --cov=src --cov-report=html

Architecture

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   REST API      │    │  Vertex AI       │    │ Cloud Storage   │
│   (Flask)       │───▶│  Batch Job       │───▶│ Embeddings      │
│                 │    │                  │    │                 │
│ • Validation    │    │ • Async          │    │ • JSONL         │
│ • Auth          │    │ • Scalable       │    │ • GCS URIs      │
│ • Rate Limiting │    │ • Cost Effective │    │                 │
└─────────────────┘    └──────────────────┘    └─────────────────┘

Additional Resources

Citation

If you use this software in your research or project, please cite it:

@software{vertex_batch_embeddings_api,
  title = {Vertex AI Batch Embeddings API},
  author = {Vertex AI Batch Embeddings API Contributors},
  year = {2025},
  url = {https://github.com/scrrlt/vertex-batch-embeddings-api},
  license = {MIT}
}

See CITATION.cff for more citation formats.

License

This project is licensed under the MIT License. See LICENSE for details.

Support

For issues, questions, or feedback:


The **Vertex AI Batch Embeddings ** API offers a REST interface for orchestrating large-scale embedding jobs, combining authentication, rate limiting, monitoring, and error handling into a reproducible, cloud-native workflow.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vertex_embeddings-1.0.0.tar.gz (18.8 kB view details)

Uploaded Source

File details

Details for the file vertex_embeddings-1.0.0.tar.gz.

File metadata

  • Download URL: vertex_embeddings-1.0.0.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.9

File hashes

Hashes for vertex_embeddings-1.0.0.tar.gz
Algorithm Hash digest
SHA256 080c4a609897dc8816b464cbe6e3aa6de5775728397ff520da39ee4f27d9324c
MD5 5c324af393fcc0705c4bf4440b173ff6
BLAKE2b-256 763d5c6a2239ca3e4733df28101d336109ceb1b3f8d3b182aee8474ccb1966c4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page