Skip to main content

Python SDK for the VynFi synthetic financial data API

Project description

VynFi Python SDK

The official Python client for the VynFi synthetic financial data API. Generate realistic financial datasets -- journal entries, chart of accounts, document flows, banking/AML data, ESG metrics, and more -- for audit analytics, fraud detection, compliance testing, and ML training.

PyPI version Python License

Installation

pip install vynfi

With optional integrations:

pip install vynfi[pandas]     # pandas DataFrame support
pip install vynfi[polars]     # polars DataFrame support
pip install vynfi[all]        # all integrations

Quick Start

from vynfi import VynFi

client = VynFi(api_key="vf_live_...")

# Generate synthetic financial data
job = client.generate(
    tables=[{"name": "journal_entries", "rows": 5000}],
    sector_slug="retail",
)
print(f"Job {job.id} submitted ({job.credits_reserved} credits)")

# Wait for completion (built-in polling)
completed = client.jobs.wait(job.id)

# Download and explore the archive
archive = client.jobs.download_archive(completed.id)
print(archive)  # JobArchive(84 files, 1.5 GB)

# Access specific files
entries = archive.json("journal_entries.json")
print(f"{len(entries)} journal entry documents")

# Or download raw bytes
data = client.jobs.download(completed.id)

See the examples/ directory for 7 Jupyter notebooks and 7 standalone scripts covering audit analytics, fraud detection, document flows, process mining, ESG reporting, and AML compliance testing.

Resources

Catalog & Templates

# Browse available sectors
sectors = client.catalog.list_sectors()
for s in sectors:
    print(f"{s.name}: {s.table_count} tables (quality={s.quality_score})")

# Get sector detail with table schemas
sector = client.catalog.get_sector("retail")
for table in sector.tables:
    print(f"  {table.name}: {len(table.columns)} columns")

# Browse system templates
templates = client.catalog.list_templates(sector="retail")
for t in templates:
    print(f"  {t.name} ({t.framework}, tier={t.min_tier})")

Jobs

# Async generation (large datasets)
job = client.jobs.generate(
    tables=[{"name": "journal_entries", "rows": 50000}],
    sector_slug="retail",
)
completed = client.jobs.wait(job.id)

# Quick synchronous generation (up to 10k rows)
result = client.jobs.generate_quick(
    tables=[{"name": "journal_entries", "rows": 100}],
    sector_slug="retail",
)

# Config-based generation
job = client.jobs.generate_config(
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
)

# List and filter jobs
jobs = client.jobs.list(status="completed", limit=10)

# Download specific artifacts
data = client.jobs.download_file(job_id, "journal_entries.json")

# Stream progress via SSE
for event in client.jobs.stream(job.id):
    if event["event"] == "progress":
        print(f"{event['data']['percent']}%")

Saved Configs

# Save a generation config for reuse
cfg = client.configs.create(
    name="Monthly Retail",
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
    tags=["recurring", "retail"],
)

# Validate before running
result = client.configs.validate(config={"sector": "retail", "rows": 100})
print(f"Valid: {result.valid}, errors: {len(result.errors)}")

# Estimate cost before running
est = client.configs.estimate_cost(config={"sector": "retail", "rows": 50000})
print(f"Estimated: {est.total_credits} credits")

Multi-Period Sessions

# Create a fiscal-year session
session = client.sessions.create(
    name="FY2026",
    fiscal_year_start="2026-01-01",
    period_length_months=3,
    periods=4,
    generation_config={"sector": "retail", "rows": 10000},
)

# Generate each period sequentially
for _ in range(session.periods_total):
    resp = client.sessions.generate_next(session.id)
    print(f"Period {resp.period_index}: job {resp.job_id}")

What-If Scenarios

# List causal graph templates
templates = client.scenarios.templates()

# Create a scenario
scenario = client.scenarios.create(
    name="Fraud Spike",
    template_id="supply-chain",
    interventions={"fraudRate": 0.05},
    generation_config={"sector": "retail", "rows": 10000},
)

# Run baseline vs counterfactual
scenario = client.scenarios.run(scenario.id)

# Get diff analysis
scenario = client.scenarios.diff(scenario.id)

Job Archives

# Download the output archive with easy file access
archive = client.jobs.download_archive(job_id)

# Explore contents
print(archive.backend)          # "zip" (legacy) or "managed_blob" (TB-scale)
print(archive.files())          # all 80+ files
print(archive.categories())     # ['banking', 'document_flows', 'esg', ...]
print(archive.summary())        # file counts and sizes by category

# Access specific files (lazy fetch via presigned URL for managed_blob)
entries = archive.json("journal_entries.json")
coa = archive.json("chart_of_accounts.json")

# Find files by pattern
banking_files = archive.find("banking/*")
esg_files = archive.find("esg/*")

# Extract everything to disk
archive.extract_to("./output")

Scenario Packs (DataSynth 3.0+)

# List 11 built-in scenario packs
packs = client.scenarios.packs()
for p in packs:
    print(f"{p.category}: {p.name}{p.description}")

# Run a scenario pack
scenario = client.scenarios.create(
    name="Q3 revenue stress test",
    generation_config={
        "sector": "retail", "rows": 10000,
        "scenarios": {
            "enabled": True,
            "packs": ["channel_stuffing"],
            "diffFormats": ["summary", "record_level"],
        },
    },
)
client.scenarios.run(scenario.id)  # spawns baseline + counterfactual
diff = client.scenarios.diff(scenario.id)

AI Tuning & Co-pilot (DataSynth 3.0+, Scale+)

# LLM suggests config improvements based on quality scores
suggestion = client.jobs.tune(job_id, target_scores={"overall": 0.95})
print(suggestion.explanation)
print("Change rows:", suggestion.original_config.get("rows"),
      "->",             suggestion.suggested_config.get("rows"))

# Ask the dashboard co-pilot
reply = client.ai.chat("Which fraud packs give me the best audit training?")
print(reply.reply)

Fingerprint Synthesis (DataSynth 3.0+, Team+)

# Privacy-preserving synthesis from a .dsf fingerprint file
submission = client.fingerprint.synthesize(
    "./my_data.dsf",
    rows=10000,
    backend="statistical",  # or "neural" / "hybrid" (Scale+)
)
job = client.jobs.wait(submission.job_id)

Adversarial Probing (DataSynth 3.0+, Enterprise)

# Probe an ONNX fraud detector for decision-boundary weaknesses
probe = client.adversarial.probe(
    "./my_model.onnx",
    n_probes=10000,
    perturbation_budget=0.05,
    threshold=0.5,
)
# ... wait for probe to complete ...
results = client.adversarial.results(probe.id)
print(f"Mean margin: {results.mean_margin:.3f}")
print(f"Positive rate: {results.positive_rate:.1%}")

Pre-built Analytics (DataSynth 2.3+)

# Get statistical evaluations for a completed job
a = client.jobs.analytics(job_id)

# Benford's Law conformity on amounts
print(f"MAD: {a.benford_analysis.mad:.4f}")
print(f"Conformity: {a.benford_analysis.conformity}")

# Amount distribution statistics
print(f"Skewness: {a.amount_distribution.skewness:.2f}")
print(f"Round number ratio: {a.amount_distribution.round_number_ratio:.2%}")

# Process variants
print(f"Happy path: {a.process_variant_summary.happy_path_concentration:.2%}")

# Banking evaluation (KYC, AML, cross-layer, velocity, false-positive)
print(f"Banking passes: {a.banking_evaluation.passes}")

NDJSON Streaming (Scale tier+)

# Rate-controlled streaming for TB-scale jobs
for envelope in client.jobs.stream_ndjson(job_id, rate=500, progress_interval=1000):
    if envelope.get("type") == "_progress":
        print(f"  {envelope['lines_emitted']:,} lines emitted")
    else:
        # Process each data record
        my_pipeline.send(envelope)

Output Mode (DataSynth 2.3+)

# Use native JSON numbers and flat layout to skip conversion boilerplate
job = client.jobs.generate_config(config={
    "sector": "retail",
    "rows": 1000,
    # ...
    "output": {
        "numericMode": "native",   # numbers, not strings
        "exportLayout": "flat",    # one row per line, header merged
    },
})

Storage Quota (TB-scale)

# Validate output size against tier quota before submitting
size = client.configs.estimate_size(config=my_config)
print(f"Estimated: {size.estimated_bytes / 1e9:.1f} GB across {size.estimated_files} files")
print(f"Quota: {size.tier_quota_bytes / 1e9:.0f} GB")
if size.exceeds_quota:
    print(f"WARNING: {size.warning}")
for bucket in size.breakdown:
    print(f"  {bucket.domain}: {bucket.bytes / 1e6:.0f} MB")

Usage & Credits

# Usage summary
usage = client.usage.summary()
print(f"Balance: {usage.balance} credits, burn rate: {usage.burn_rate}/day")

# Daily breakdown
daily = client.usage.daily()
for d in daily.daily:
    print(f"  {d.date}: {d.credits} credits")

# Prepaid credit balance
balance = client.credits.balance()
print(f"Prepaid: {balance.total_prepaid_credits}")

# Purchase credits
resp = client.credits.purchase(pack="10k")
print(f"Checkout: {resp.checkout_url}")

Quality Scores

scores = client.quality.scores()
for s in scores:
    print(f"Job {s.job_id}: overall={s.overall_score:.2f}")

timeline = client.quality.timeline(days=30)

API Keys, Webhooks, Billing, Notifications

# API keys
key = client.api_keys.create(name="CI pipeline", environment="test")
print(f"Key: {key.key}")  # Only shown once!

# Webhooks
hook = client.webhooks.create(
    url="https://example.com/webhook",
    events=["job.completed", "job.failed"],
)

# Billing
sub = client.billing.subscription()
portal = client.billing.portal()
print(f"Manage billing: {portal.portal_url}")

# Notifications
unread = client.notifications.list(unread=True)
client.notifications.mark_read(all=True)

Ecosystem Integrations

pandas

from vynfi.integrations.pandas import (
    job_to_dataframe,
    archive_to_dataframes,
    usage_to_dataframe,
)

# Convert a single file from an archive to a DataFrame
archive = client.jobs.download_archive(job_id)
df = job_to_dataframe(archive.read("journal_entries.json"))

# Convert ALL JSON files in the archive to DataFrames at once
frames = archive_to_dataframes(archive)
# {'journal_entries.json': DataFrame, 'banking/banking_customers.json': DataFrame, ...}

# Usage analytics as a time-indexed DataFrame
usage_df = usage_to_dataframe(client, days=30)

polars

from vynfi.integrations.polars import download_frame, usage_to_frame

df = download_frame(client, job_id, "journal_entries.json")
print(df.describe())

Error Handling

from vynfi import (
    VynFi,
    AuthenticationError,
    ForbiddenError,
    InsufficientCreditsError,
    NotFoundError,
    RateLimitError,
    ValidationError,
)

try:
    job = client.generate(tables=[{"name": "journal_entries", "rows": 1000000}])
except InsufficientCreditsError:
    print("Not enough credits")
except RateLimitError:
    print("Too many requests — automatic retry exhausted")
except ValidationError as e:
    print(f"Invalid request: {e}")

Configuration

client = VynFi(
    api_key="vf_live_...",
    base_url="https://api.vynfi.com",  # default
    timeout=30.0,                       # request timeout in seconds
    max_retries=2,                      # automatic retry on 429/5xx
)

# Context manager support
with VynFi(api_key="vf_live_...") as client:
    usage = client.usage.summary()

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vynfi-1.4.0.tar.gz (262.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vynfi-1.4.0-py3-none-any.whl (42.7 kB view details)

Uploaded Python 3

File details

Details for the file vynfi-1.4.0.tar.gz.

File metadata

  • Download URL: vynfi-1.4.0.tar.gz
  • Upload date:
  • Size: 262.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vynfi-1.4.0.tar.gz
Algorithm Hash digest
SHA256 9365614fcb1c89f3f55660b44c06e096f91c362f0efccd58c5ae2c1d1f66fd17
MD5 eac356e501db59e45824bb5432eba45c
BLAKE2b-256 cbde4fb1613246fe3102e6d98b949390a887fb7c4f5a860dd7c43c7555283fc8

See more details on using hashes here.

Provenance

The following attestation bundles were made for vynfi-1.4.0.tar.gz:

Publisher: publish.yml on VynFi/VynFi-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file vynfi-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: vynfi-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 42.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for vynfi-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 080edbdf457d652fb36374285a4145807987af924ff05e23aed6e4ce1aaf7c18
MD5 8c74c9ff6c5971f3e3ebd10f6a0f62c3
BLAKE2b-256 7b071ffa92c7596c42eff2a4ecffddc39eac309dbfac5d11a03ac9e83f28a6b4

See more details on using hashes here.

Provenance

The following attestation bundles were made for vynfi-1.4.0-py3-none-any.whl:

Publisher: publish.yml on VynFi/VynFi-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page