Skip to main content

Python SDK for the VynFi synthetic financial data API

Project description

VynFi Python SDK

The official Python client for the VynFi synthetic financial data API. Generate realistic financial datasets -- journal entries, chart of accounts, document flows, banking/AML data, ESG metrics, and more -- for audit analytics, fraud detection, compliance testing, and ML training.

PyPI version Python License

Installation

pip install vynfi

With optional integrations:

pip install vynfi[pandas]     # pandas DataFrame support
pip install vynfi[polars]     # polars DataFrame support
pip install vynfi[all]        # all integrations

Quick Start

from vynfi import VynFi

client = VynFi(api_key="vf_live_...")

# Generate synthetic financial data
job = client.generate(
    tables=[{"name": "journal_entries", "rows": 5000}],
    sector_slug="retail",
)
print(f"Job {job.id} submitted ({job.credits_reserved} credits)")

# Wait for completion (built-in polling)
completed = client.jobs.wait(job.id)

# Download and explore the archive
archive = client.jobs.download_archive(completed.id)
print(archive)  # JobArchive(84 files, 1.5 GB)

# Access specific files
entries = archive.json("journal_entries.json")
print(f"{len(entries)} journal entry documents")

# Or download raw bytes
data = client.jobs.download(completed.id)

See the examples/ directory for 7 Jupyter notebooks and 7 standalone scripts covering audit analytics, fraud detection, document flows, process mining, ESG reporting, and AML compliance testing.

Resources

Catalog & Templates

# Browse available sectors
sectors = client.catalog.list_sectors()
for s in sectors:
    print(f"{s.name}: {s.table_count} tables (quality={s.quality_score})")

# Get sector detail with table schemas
sector = client.catalog.get_sector("retail")
for table in sector.tables:
    print(f"  {table.name}: {len(table.columns)} columns")

# Browse system templates
templates = client.catalog.list_templates(sector="retail")
for t in templates:
    print(f"  {t.name} ({t.framework}, tier={t.min_tier})")

Jobs

# Async generation (large datasets)
job = client.jobs.generate(
    tables=[{"name": "journal_entries", "rows": 50000}],
    sector_slug="retail",
)
completed = client.jobs.wait(job.id)

# Quick synchronous generation (up to 10k rows)
result = client.jobs.generate_quick(
    tables=[{"name": "journal_entries", "rows": 100}],
    sector_slug="retail",
)

# Config-based generation
job = client.jobs.generate_config(
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
)

# List and filter jobs
jobs = client.jobs.list(status="completed", limit=10)

# Download specific artifacts
data = client.jobs.download_file(job_id, "journal_entries.json")

# Stream progress via SSE
for event in client.jobs.stream(job.id):
    if event["event"] == "progress":
        print(f"{event['data']['percent']}%")

Saved Configs

# Save a generation config for reuse
cfg = client.configs.create(
    name="Monthly Retail",
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
    tags=["recurring", "retail"],
)

# Validate before running
result = client.configs.validate(config={"sector": "retail", "rows": 100})
print(f"Valid: {result.valid}, errors: {len(result.errors)}")

# Estimate cost before running
est = client.configs.estimate_cost(config={"sector": "retail", "rows": 50000})
print(f"Estimated: {est.total_credits} credits")

Multi-Period Sessions

# Create a fiscal-year session
session = client.sessions.create(
    name="FY2026",
    fiscal_year_start="2026-01-01",
    period_length_months=3,
    periods=4,
    generation_config={"sector": "retail", "rows": 10000},
)

# Generate each period sequentially
for _ in range(session.periods_total):
    resp = client.sessions.generate_next(session.id)
    print(f"Period {resp.period_index}: job {resp.job_id}")

What-If Scenarios

# List causal graph templates
templates = client.scenarios.templates()

# Create a scenario
scenario = client.scenarios.create(
    name="Fraud Spike",
    template_id="supply-chain",
    interventions={"fraudRate": 0.05},
    generation_config={"sector": "retail", "rows": 10000},
)

# Run baseline vs counterfactual
scenario = client.scenarios.run(scenario.id)

# Get diff analysis
scenario = client.scenarios.diff(scenario.id)

Job Archives

# Download the output archive with easy file access
archive = client.jobs.download_archive(job_id)

# Explore contents
print(archive.backend)          # "zip" (legacy) or "managed_blob" (TB-scale)
print(archive.files())          # all 80+ files
print(archive.categories())     # ['banking', 'document_flows', 'esg', ...]
print(archive.summary())        # file counts and sizes by category

# Access specific files (lazy fetch via presigned URL for managed_blob)
entries = archive.json("journal_entries.json")
coa = archive.json("chart_of_accounts.json")

# Find files by pattern
banking_files = archive.find("banking/*")
esg_files = archive.find("esg/*")

# Extract everything to disk
archive.extract_to("./output")

Scenario Packs (DataSynth 3.0+)

# List 11 built-in scenario packs
packs = client.scenarios.packs()
for p in packs:
    print(f"{p.category}: {p.name}{p.description}")

# Run a scenario pack
scenario = client.scenarios.create(
    name="Q3 revenue stress test",
    generation_config={
        "sector": "retail", "rows": 10000,
        "scenarios": {
            "enabled": True,
            "packs": ["channel_stuffing"],
            "diffFormats": ["summary", "record_level"],
        },
    },
)
client.scenarios.run(scenario.id)  # spawns baseline + counterfactual
diff = client.scenarios.diff(scenario.id)

AI Tuning & Co-pilot (DataSynth 3.0+, Scale+)

# LLM suggests config improvements based on quality scores
suggestion = client.jobs.tune(job_id, target_scores={"overall": 0.95})
print(suggestion.explanation)
print("Change rows:", suggestion.original_config.get("rows"),
      "->",             suggestion.suggested_config.get("rows"))

# Ask the dashboard co-pilot
reply = client.ai.chat("Which fraud packs give me the best audit training?")
print(reply.reply)

Fingerprint Synthesis (DataSynth 3.0+, Team+)

# Privacy-preserving synthesis from a .dsf fingerprint file
submission = client.fingerprint.synthesize(
    "./my_data.dsf",
    rows=10000,
    backend="statistical",  # or "neural" / "hybrid" (Scale+)
)
job = client.jobs.wait(submission.job_id)

Adversarial Probing (DataSynth 3.0+, Enterprise)

# Probe an ONNX fraud detector for decision-boundary weaknesses
probe = client.adversarial.probe(
    "./my_model.onnx",
    n_probes=10000,
    perturbation_budget=0.05,
    threshold=0.5,
)
# ... wait for probe to complete ...
results = client.adversarial.results(probe.id)
print(f"Mean margin: {results.mean_margin:.3f}")
print(f"Positive rate: {results.positive_rate:.1%}")

Pre-built Analytics (DataSynth 2.3+)

# Get statistical evaluations for a completed job
a = client.jobs.analytics(job_id)

# Benford's Law conformity on amounts
print(f"MAD: {a.benford_analysis.mad:.4f}")
print(f"Conformity: {a.benford_analysis.conformity}")

# Amount distribution statistics
print(f"Skewness: {a.amount_distribution.skewness:.2f}")
print(f"Round number ratio: {a.amount_distribution.round_number_ratio:.2%}")

# Process variants
print(f"Happy path: {a.process_variant_summary.happy_path_concentration:.2%}")

# Banking evaluation (KYC, AML, cross-layer, velocity, false-positive)
print(f"Banking passes: {a.banking_evaluation.passes}")

NDJSON Streaming (Scale tier+)

# Rate-controlled streaming for TB-scale jobs
for envelope in client.jobs.stream_ndjson(job_id, rate=500, progress_interval=1000):
    if envelope.get("type") == "_progress":
        print(f"  {envelope['lines_emitted']:,} lines emitted")
    else:
        # Process each data record
        my_pipeline.send(envelope)

Output Mode (DataSynth 2.3+)

# Use native JSON numbers and flat layout to skip conversion boilerplate
job = client.jobs.generate_config(config={
    "sector": "retail",
    "rows": 1000,
    # ...
    "output": {
        "numericMode": "native",   # numbers, not strings
        "exportLayout": "flat",    # one row per line, header merged
    },
})

Storage Quota (TB-scale)

# Validate output size against tier quota before submitting
size = client.configs.estimate_size(config=my_config)
print(f"Estimated: {size.estimated_bytes / 1e9:.1f} GB across {size.estimated_files} files")
print(f"Quota: {size.tier_quota_bytes / 1e9:.0f} GB")
if size.exceeds_quota:
    print(f"WARNING: {size.warning}")
for bucket in size.breakdown:
    print(f"  {bucket.domain}: {bucket.bytes / 1e6:.0f} MB")

Usage & Credits

# Usage summary
usage = client.usage.summary()
print(f"Balance: {usage.balance} credits, burn rate: {usage.burn_rate}/day")

# Daily breakdown
daily = client.usage.daily()
for d in daily.daily:
    print(f"  {d.date}: {d.credits} credits")

# Prepaid credit balance
balance = client.credits.balance()
print(f"Prepaid: {balance.total_prepaid_credits}")

# Purchase credits
resp = client.credits.purchase(pack="10k")
print(f"Checkout: {resp.checkout_url}")

Quality Scores

scores = client.quality.scores()
for s in scores:
    print(f"Job {s.job_id}: overall={s.overall_score:.2f}")

timeline = client.quality.timeline(days=30)

API Keys, Webhooks, Billing, Notifications

# API keys
key = client.api_keys.create(name="CI pipeline", environment="test")
print(f"Key: {key.key}")  # Only shown once!

# Webhooks
hook = client.webhooks.create(
    url="https://example.com/webhook",
    events=["job.completed", "job.failed"],
)

# Billing
sub = client.billing.subscription()
portal = client.billing.portal()
print(f"Manage billing: {portal.portal_url}")

# Notifications
unread = client.notifications.list(unread=True)
client.notifications.mark_read(all=True)

Ecosystem Integrations

pandas

from vynfi.integrations.pandas import (
    job_to_dataframe,
    archive_to_dataframes,
    usage_to_dataframe,
)

# Convert a single file from an archive to a DataFrame
archive = client.jobs.download_archive(job_id)
df = job_to_dataframe(archive.read("journal_entries.json"))

# Convert ALL JSON files in the archive to DataFrames at once
frames = archive_to_dataframes(archive)
# {'journal_entries.json': DataFrame, 'banking/banking_customers.json': DataFrame, ...}

# Usage analytics as a time-indexed DataFrame
usage_df = usage_to_dataframe(client, days=30)

polars

from vynfi.integrations.polars import download_frame, usage_to_frame

df = download_frame(client, job_id, "journal_entries.json")
print(df.describe())

Error Handling

from vynfi import (
    VynFi,
    AuthenticationError,
    ForbiddenError,
    InsufficientCreditsError,
    NotFoundError,
    RateLimitError,
    ValidationError,
)

try:
    job = client.generate(tables=[{"name": "journal_entries", "rows": 1000000}])
except InsufficientCreditsError:
    print("Not enough credits")
except RateLimitError:
    print("Too many requests — automatic retry exhausted")
except ValidationError as e:
    print(f"Invalid request: {e}")

Configuration

client = VynFi(
    api_key="vf_live_...",
    base_url="https://api.vynfi.com",  # default
    timeout=30.0,                       # request timeout in seconds
    max_retries=2,                      # automatic retry on 429/5xx
)

# Context manager support
with VynFi(api_key="vf_live_...") as client:
    usage = client.usage.summary()

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vynfi-1.5.0.tar.gz (318.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vynfi-1.5.0-py3-none-any.whl (47.4 kB view details)

Uploaded Python 3

File details

Details for the file vynfi-1.5.0.tar.gz.

File metadata

  • Download URL: vynfi-1.5.0.tar.gz
  • Upload date:
  • Size: 318.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vynfi-1.5.0.tar.gz
Algorithm Hash digest
SHA256 f448568981bc29abbd34252ab4c5d15f3e25718ae324771e3aec3424ce55704f
MD5 972dae07bc9dd8835f68e2ece021e656
BLAKE2b-256 73ee50dcb9f626ec3c17658614322156f2af32baa9c8cf8e047f1a83402f3c3c

See more details on using hashes here.

Provenance

The following attestation bundles were made for vynfi-1.5.0.tar.gz:

Publisher: publish.yml on VynFi/VynFi-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file vynfi-1.5.0-py3-none-any.whl.

File metadata

  • Download URL: vynfi-1.5.0-py3-none-any.whl
  • Upload date:
  • Size: 47.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vynfi-1.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ac6a39aae20e4b8627c368893b4ec528a5fc2c9eda631c703d9a7deb78285e10
MD5 6de1a2e2fe873758f4e66ce77a9b7d69
BLAKE2b-256 f048b62c5b85de1edf066a708ba6135b65cdf5754705e3fd622dcb60008ee28e

See more details on using hashes here.

Provenance

The following attestation bundles were made for vynfi-1.5.0-py3-none-any.whl:

Publisher: publish.yml on VynFi/VynFi-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page