Skip to main content

Python SDK for the VynFi synthetic financial data API

Project description

VynFi Python SDK

The official Python client for the VynFi synthetic financial data API. Generate realistic financial datasets -- journal entries, chart of accounts, document flows, banking/AML data, ESG metrics, and more -- for audit analytics, fraud detection, compliance testing, and ML training.

PyPI version Python License

Installation

pip install vynfi

With optional integrations:

pip install vynfi[pandas]     # pandas DataFrame support
pip install vynfi[polars]     # polars DataFrame support
pip install vynfi[all]        # all integrations

Quick Start

from vynfi import VynFi

client = VynFi(api_key="vf_live_...")

# Generate synthetic financial data
job = client.generate(
    tables=[{"name": "journal_entries", "rows": 5000}],
    sector_slug="retail",
)
print(f"Job {job.id} submitted ({job.credits_reserved} credits)")

# Wait for completion (built-in polling)
completed = client.jobs.wait(job.id)

# Download and explore the archive
archive = client.jobs.download_archive(completed.id)
print(archive)  # JobArchive(84 files, 1.5 GB)

# Access specific files
entries = archive.json("journal_entries.json")
print(f"{len(entries)} journal entry documents")

# Or download raw bytes
data = client.jobs.download(completed.id)

See the examples/ directory for 7 Jupyter notebooks and 7 standalone scripts covering audit analytics, fraud detection, document flows, process mining, ESG reporting, and AML compliance testing.

Resources

Catalog & Templates

# Browse available sectors
sectors = client.catalog.list_sectors()
for s in sectors:
    print(f"{s.name}: {s.table_count} tables (quality={s.quality_score})")

# Get sector detail with table schemas
sector = client.catalog.get_sector("retail")
for table in sector.tables:
    print(f"  {table.name}: {len(table.columns)} columns")

# Browse system templates
templates = client.catalog.list_templates(sector="retail")
for t in templates:
    print(f"  {t.name} ({t.framework}, tier={t.min_tier})")

Jobs

# Async generation (large datasets)
job = client.jobs.generate(
    tables=[{"name": "journal_entries", "rows": 50000}],
    sector_slug="retail",
)
completed = client.jobs.wait(job.id)

# Quick synchronous generation (up to 10k rows)
result = client.jobs.generate_quick(
    tables=[{"name": "journal_entries", "rows": 100}],
    sector_slug="retail",
)

# Config-based generation
job = client.jobs.generate_config(
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
)

# List and filter jobs
jobs = client.jobs.list(status="completed", limit=10)

# Download specific artifacts
data = client.jobs.download_file(job_id, "journal_entries.json")

# Stream progress via SSE
for event in client.jobs.stream(job.id):
    if event["event"] == "progress":
        print(f"{event['data']['percent']}%")

Saved Configs

# Save a generation config for reuse
cfg = client.configs.create(
    name="Monthly Retail",
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
    tags=["recurring", "retail"],
)

# Validate before running
result = client.configs.validate(config={"sector": "retail", "rows": 100})
print(f"Valid: {result.valid}, errors: {len(result.errors)}")

# Estimate cost before running
est = client.configs.estimate_cost(config={"sector": "retail", "rows": 50000})
print(f"Estimated: {est.total_credits} credits")

Multi-Period Sessions

# Create a fiscal-year session
session = client.sessions.create(
    name="FY2026",
    fiscal_year_start="2026-01-01",
    period_length_months=3,
    periods=4,
    generation_config={"sector": "retail", "rows": 10000},
)

# Generate each period sequentially
for _ in range(session.periods_total):
    resp = client.sessions.generate_next(session.id)
    print(f"Period {resp.period_index}: job {resp.job_id}")

What-If Scenarios

# List causal graph templates
templates = client.scenarios.templates()

# Create a scenario
scenario = client.scenarios.create(
    name="Fraud Spike",
    template_id="supply-chain",
    interventions={"fraudRate": 0.05},
    generation_config={"sector": "retail", "rows": 10000},
)

# Run baseline vs counterfactual
scenario = client.scenarios.run(scenario.id)

# Get diff analysis
scenario = client.scenarios.diff(scenario.id)

Job Archives

# Download the output archive with easy file access
archive = client.jobs.download_archive(job_id)

# Explore contents
print(archive.backend)          # "zip" (legacy) or "managed_blob" (TB-scale)
print(archive.files())          # all 80+ files
print(archive.categories())     # ['banking', 'document_flows', 'esg', ...]
print(archive.summary())        # file counts and sizes by category

# Access specific files (lazy fetch via presigned URL for managed_blob)
entries = archive.json("journal_entries.json")
coa = archive.json("chart_of_accounts.json")

# Find files by pattern
banking_files = archive.find("banking/*")
esg_files = archive.find("esg/*")

# Extract everything to disk
archive.extract_to("./output")

Scenario Packs (DataSynth 3.0+)

# List 11 built-in scenario packs
packs = client.scenarios.packs()
for p in packs:
    print(f"{p.category}: {p.name}{p.description}")

# Run a scenario pack
scenario = client.scenarios.create(
    name="Q3 revenue stress test",
    generation_config={
        "sector": "retail", "rows": 10000,
        "scenarios": {
            "enabled": True,
            "packs": ["channel_stuffing"],
            "diffFormats": ["summary", "record_level"],
        },
    },
)
client.scenarios.run(scenario.id)  # spawns baseline + counterfactual
diff = client.scenarios.diff(scenario.id)

AI Tuning & Co-pilot (DataSynth 3.0+, Scale+)

# LLM suggests config improvements based on quality scores
suggestion = client.jobs.tune(job_id, target_scores={"overall": 0.95})
print(suggestion.explanation)
print("Change rows:", suggestion.original_config.get("rows"),
      "->",             suggestion.suggested_config.get("rows"))

# Ask the dashboard co-pilot
reply = client.ai.chat("Which fraud packs give me the best audit training?")
print(reply.reply)

Fingerprint Synthesis (DataSynth 3.0+, Team+)

# Privacy-preserving synthesis from a .dsf fingerprint file
submission = client.fingerprint.synthesize(
    "./my_data.dsf",
    rows=10000,
    backend="statistical",  # or "neural" / "hybrid" (Scale+)
)
job = client.jobs.wait(submission.job_id)

Adversarial Probing (DataSynth 3.0+, Enterprise)

# Probe an ONNX fraud detector for decision-boundary weaknesses
probe = client.adversarial.probe(
    "./my_model.onnx",
    n_probes=10000,
    perturbation_budget=0.05,
    threshold=0.5,
)
# ... wait for probe to complete ...
results = client.adversarial.results(probe.id)
print(f"Mean margin: {results.mean_margin:.3f}")
print(f"Positive rate: {results.positive_rate:.1%}")

Pre-built Analytics (DataSynth 2.3+)

# Get statistical evaluations for a completed job
a = client.jobs.analytics(job_id)

# Benford's Law conformity on amounts
print(f"MAD: {a.benford_analysis.mad:.4f}")
print(f"Conformity: {a.benford_analysis.conformity}")

# Amount distribution statistics
print(f"Skewness: {a.amount_distribution.skewness:.2f}")
print(f"Round number ratio: {a.amount_distribution.round_number_ratio:.2%}")

# Process variants
print(f"Happy path: {a.process_variant_summary.happy_path_concentration:.2%}")

# Banking evaluation (KYC, AML, cross-layer, velocity, false-positive)
print(f"Banking passes: {a.banking_evaluation.passes}")

NDJSON Streaming (Scale tier+)

# Rate-controlled streaming for TB-scale jobs
for envelope in client.jobs.stream_ndjson(job_id, rate=500, progress_interval=1000):
    if envelope.get("type") == "_progress":
        print(f"  {envelope['lines_emitted']:,} lines emitted")
    else:
        # Process each data record
        my_pipeline.send(envelope)

Output Mode (DataSynth 2.3+)

# Use native JSON numbers and flat layout to skip conversion boilerplate
job = client.jobs.generate_config(config={
    "sector": "retail",
    "rows": 1000,
    # ...
    "output": {
        "numericMode": "native",   # numbers, not strings
        "exportLayout": "flat",    # one row per line, header merged
    },
})

Storage Quota (TB-scale)

# Validate output size against tier quota before submitting
size = client.configs.estimate_size(config=my_config)
print(f"Estimated: {size.estimated_bytes / 1e9:.1f} GB across {size.estimated_files} files")
print(f"Quota: {size.tier_quota_bytes / 1e9:.0f} GB")
if size.exceeds_quota:
    print(f"WARNING: {size.warning}")
for bucket in size.breakdown:
    print(f"  {bucket.domain}: {bucket.bytes / 1e6:.0f} MB")

Usage & Credits

# Usage summary
usage = client.usage.summary()
print(f"Balance: {usage.balance} credits, burn rate: {usage.burn_rate}/day")

# Daily breakdown
daily = client.usage.daily()
for d in daily.daily:
    print(f"  {d.date}: {d.credits} credits")

# Prepaid credit balance
balance = client.credits.balance()
print(f"Prepaid: {balance.total_prepaid_credits}")

# Purchase credits
resp = client.credits.purchase(pack="10k")
print(f"Checkout: {resp.checkout_url}")

Quality Scores

scores = client.quality.scores()
for s in scores:
    print(f"Job {s.job_id}: overall={s.overall_score:.2f}")

timeline = client.quality.timeline(days=30)

API Keys, Webhooks, Billing, Notifications

# API keys
key = client.api_keys.create(name="CI pipeline", environment="test")
print(f"Key: {key.key}")  # Only shown once!

# Webhooks
hook = client.webhooks.create(
    url="https://example.com/webhook",
    events=["job.completed", "job.failed"],
)

# Billing
sub = client.billing.subscription()
portal = client.billing.portal()
print(f"Manage billing: {portal.portal_url}")

# Notifications
unread = client.notifications.list(unread=True)
client.notifications.mark_read(all=True)

Ecosystem Integrations

pandas

from vynfi.integrations.pandas import (
    job_to_dataframe,
    archive_to_dataframes,
    usage_to_dataframe,
)

# Convert a single file from an archive to a DataFrame
archive = client.jobs.download_archive(job_id)
df = job_to_dataframe(archive.read("journal_entries.json"))

# Convert ALL JSON files in the archive to DataFrames at once
frames = archive_to_dataframes(archive)
# {'journal_entries.json': DataFrame, 'banking/banking_customers.json': DataFrame, ...}

# Usage analytics as a time-indexed DataFrame
usage_df = usage_to_dataframe(client, days=30)

polars

from vynfi.integrations.polars import download_frame, usage_to_frame

df = download_frame(client, job_id, "journal_entries.json")
print(df.describe())

Error Handling

from vynfi import (
    VynFi,
    AuthenticationError,
    ForbiddenError,
    InsufficientCreditsError,
    NotFoundError,
    RateLimitError,
    ValidationError,
)

try:
    job = client.generate(tables=[{"name": "journal_entries", "rows": 1000000}])
except InsufficientCreditsError:
    print("Not enough credits")
except RateLimitError:
    print("Too many requests — automatic retry exhausted")
except ValidationError as e:
    print(f"Invalid request: {e}")

Configuration

client = VynFi(
    api_key="vf_live_...",
    base_url="https://api.vynfi.com",  # default
    timeout=30.0,                       # request timeout in seconds
    max_retries=2,                      # automatic retry on 429/5xx
)

# Context manager support
with VynFi(api_key="vf_live_...") as client:
    usage = client.usage.summary()

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vynfi-1.7.0.tar.gz (3.9 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vynfi-1.7.0-py3-none-any.whl (52.3 kB view details)

Uploaded Python 3

File details

Details for the file vynfi-1.7.0.tar.gz.

File metadata

  • Download URL: vynfi-1.7.0.tar.gz
  • Upload date:
  • Size: 3.9 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vynfi-1.7.0.tar.gz
Algorithm Hash digest
SHA256 518783edbcc0ae967c3158b943a204b9e9d8cc4299269063d655271f4712e123
MD5 bee26a41bf52774ca875b7722c65e2d0
BLAKE2b-256 e0de8c0862e5c8924f16685db51f7274ae6dc188b2a1b92c8573309d43b18745

See more details on using hashes here.

Provenance

The following attestation bundles were made for vynfi-1.7.0.tar.gz:

Publisher: publish.yml on VynFi/VynFi-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file vynfi-1.7.0-py3-none-any.whl.

File metadata

  • Download URL: vynfi-1.7.0-py3-none-any.whl
  • Upload date:
  • Size: 52.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vynfi-1.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6cc64aff8a31e7a65333c8d18b8f225a2b3e87f804540c7464bb9769524efa7e
MD5 e1e5fa78ac1bc6e153deb0a35790431a
BLAKE2b-256 568048a0a264a28eb527cfd9325802b59de61249f923ba8161899afc0cd85632

See more details on using hashes here.

Provenance

The following attestation bundles were made for vynfi-1.7.0-py3-none-any.whl:

Publisher: publish.yml on VynFi/VynFi-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page