Extract, enrich, assess, hunt, block, and investigate indicators of compromise (IOCs) — a layered, pip-extras toolkit for the full IOC lifecycle, from unstructured text to feed-driven triggers and a human-in-the-loop multi-agent response.
Project description
iocflow
Pull indicators of compromise out of unstructured text — threat-intel reports, advisories, emails, tickets — in one call. iocflow extracts IPs, domains, URLs, filenames, file hashes, CVEs, MITRE ATT&CK technique IDs, threat actors, and malware families, with the false-positive defenses you'd otherwise write by hand: a Public Suffix List domain validator, benign-domain/IP allowlists, hash de-duplication across MD5/SHA1/SHA256, and re-fanging of defanged IOCs.
from iocflow import extract
text = """
APT28 (a.k.a. Fancy Bear) staged Cobalt Strike from evil-domain[.]ru and
185.220.101.5, dropping install.ps1 (MD5 a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4).
Exploited CVE-2021-44228 via T1190. Contact: ops@evil-domain[.]ru.
"""
entities = extract(text)
print(entities.summary())
# 1 IPs, 1 domains, 1 filenames, 1 hashes, 1 CVEs, 1 emails, 1 threat actors, 1 MITRE techniques
for ind in entities.iter_indicators():
print(ind.kind, ind.value)
# ip 185.220.101.5
# domain evil-domain.ru
# ...
The defanged evil-domain[.]ru and ops@evil-domain[.]ru are re-fanged
automatically; 185.220.101.5 is kept while private/benign IPs are dropped.
Install
pip install iocflow # core — one dependency (tldextract)
pip install "iocflow[mitre]" # + a ready-made MITRE ATT&CK malware-name source
What it extracts
extract(text) returns an ExtractedEntities with:
ips— public IPv4, excluding private ranges, benign IPs, and version-number-like valuesdomains— validated against the Mozilla Public Suffix List viatldextracturls— bothhttps://…and barehost/pathforms (so package-registry paths survive)filenames— suspicious script/executable/macro/archive filenameshashes—{"md5": [...], "sha1": [...], "sha256": [...]}, de-duplicated across lengthscves—CVE-YYYY-NNNN+, normalized to uppercaseemailsmitre_techniques—T1059,T1059.001, …threat_actors(+threat_actors_enriched) — APT/UNC/FIN/TA/DEV/STORM designators, a curated well-known list, and the"<Name> ransomware"patternmalware_families— populated when you supply a malware-name source (see below)
Each individual extractor is also importable and composable:
from iocflow import extract_ips, extract_hashes, refang_text
extract_ips(refang_text("c2 at 185[.]220[.]101[.]5")) # ['185.220.101.5']
Pluggable name sources
The core has no external-data dependency. Two enrichment sources are optional and supplied by you, so iocflow drops cleanly into any environment — plug in your own feeds, or use the bundled MITRE extra.
Malware families. Give extract a MalwareNames and it matches families
(with alias-to-canonical normalization) behind a three-layer false-positive
defense. Build one from your own list, from MITRE-shaped records, or from the
optional extra:
from iocflow import extract, MalwareNames
# Your own list:
names = MalwareNames.from_names(["Cobalt Strike", "Emotet", "Qakbot"])
entities = extract(report_text, malware_names=names)
# Or the bundled MITRE ATT&CK source (needs: pip install "iocflow[mitre]"):
from iocflow.mitre import mitre_malware_names
entities = extract(report_text, malware_names=mitre_malware_names())
Threat-actor aliases. Give extract an ActorAliases to match a custom
name set and enrich actors with common_name / region / all_names. Without
it, actors are still found by pattern and curated list:
from iocflow import extract, ActorAliases
aliases = ActorAliases.from_index({
"apt28": {"common_name": "APT28", "region": "Russia",
"all_names": ["Fancy Bear", "Sofacy", "Sednit"]},
})
entities = extract(report_text, actor_aliases=aliases)
entities.threat_actors_enriched[0].region # "Russia"
entities.threat_actors_enriched[0].aliases_display() # "Fancy Bear, Sofacy, Sednit"
Command line
iocflow "APT28 used 185.220.101.5 and evil[.]example[.]com"
echo "report text…" | iocflow --json
iocflow --mitre "Emotet dropped Cobalt Strike" # needs iocflow[mitre]
Layer 2 — enrichment
Take the extracted entities and look every indicator up against threat-intel sources, getting back a normalized verdict per indicator. Install the extra and set the API keys you have:
pip install "iocflow[enrich]"
export IOCFLOW_VT_API_KEY=... # VirusTotal (free key)
export IOCFLOW_ABUSEIPDB_API_KEY=... # AbuseIPDB (free key)
export IOCFLOW_ABUSECH_API_KEY=... # abuse.ch (free Auth-Key)
from iocflow import extract
from iocflow.enrich import enrich
entities = extract(report_text)
report = enrich(entities) # uses every source whose key is set
print(report.summary())
# 5 indicators across 3 sources, 2 malicious, 1 suspicious
for ind in report.malicious:
print("malicious:", ind.kind, ind.value, "→", report.verdict_for(ind.kind, ind.value).value)
Each indicator is routed only to the sources that handle its kind (VirusTotal: IPs/domains/URLs/hashes; AbuseIPDB: IPs; abuse.ch: IPs/domains/URLs/hashes via ThreatFox/URLhaus/MalwareBazaar). Lookups fan out over a thread pool. A source with no key is skipped, and a failing lookup becomes an error record rather than crashing the batch — so partial coverage still produces a report.
Verdicts are normalized to MALICIOUS / SUSPICIOUS / BENIGN / UNKNOWN and
aggregated worst-wins across sources. You can also pass enrichers explicitly,
restrict to certain kinds, or supply a cache:
from iocflow.enrich import enrich, VirusTotalEnricher, MemoryCache
report = enrich(
entities,
[VirusTotalEnricher("my-key")],
kinds={"ip", "domain"},
cache=MemoryCache(),
)
Bring your own source by implementing the Enricher protocol (name,
supports(kind), enrich(kind, value) -> EnrichmentRecord) — or subclass
HTTPEnricher to get session handling, rate-limiting, and error-wrapping for
free.
Layer 3 — AI commentary
Turn the enrichment report into an analyst-style assessment with an LLM. Install the extra and point it at any OpenAI-compatible endpoint (OpenAI, Azure, or a local server like vLLM / Ollama / LM Studio):
pip install "iocflow[ai]"
export IOCFLOW_LLM_API_KEY=... # omit for keyless local servers
export IOCFLOW_LLM_BASE_URL=http://localhost:11434/v1 # default: OpenAI
export IOCFLOW_LLM_MODEL=gpt-4o-mini
from iocflow import extract
from iocflow.enrich import enrich
from iocflow.ai import comment
entities = extract(report_text)
report = enrich(entities)
note = comment(report, entities=entities, text=report_text)
print(note.severity.value, "—", note.summary)
for finding in note.key_findings:
print(" •", finding)
for action in note.recommendations:
print(" →", action)
comment() returns a structured Commentary (severity, assessment,
key_findings, recommendations) and is hardened against flaky model output:
- The model is asked for JSON; if it answers with prose or fenced JSON, the text is parsed best-effort, falling back to using it as the narrative.
- If no model is configured, or a call fails,
comment()returns a deterministic assessment built straight from the report — so it always returns a usable result and never raises. The LLM is the primary path; the fallback guarantees the pipeline keeps working without one.
Bring any model by implementing the CommentaryModel protocol (name +
complete(system, user, *, json=False) -> str).
Layer 4 — suggested hunts
Turn the indicators into ready-to-run hunt queries for the platforms a SOC actually uses. The deterministic core runs offline — no network, no API keys:
pip install "iocflow[hunt]" # only the optional LLM path needs the extra
from iocflow import extract
from iocflow.enrich import enrich
from iocflow.hunt import suggest
entities = extract(report_text)
report = enrich(entities)
plan = suggest(report) # CrowdStrike CQL, Cortex XQL, Sigma
print(plan.summary())
# 9 hunts across 3 dialects
for hunt in plan.for_dialect("sigma"):
print(f"# {hunt.title} [{hunt.severity.value}]")
print(hunt.query)
For each indicator kind it renders one sweep query per dialect — CrowdStrike
CQL (in(RemoteAddressIP4, values=[...])), Cortex XQL
(dataset = xdr_data | filter ...), and a complete Sigma rule (with a
stable, content-derived id). Values are escaped and de-duplicated; each dialect
renders only the indicator kinds it has a real field for, and benign-verdict
indicators are skipped by default (include_benign=True to keep them). Restrict
output with dialects=["sigma"].
With a model configured (the same IOCFLOW_LLM_* env as Layer 3), suggest()
also proposes behavioral hunts — TTP- and anomaly-based ideas that go beyond
literal IOC matching:
plan = suggest(report, entities=entities, commentary=note)
behavioral = [h for h in plan.hunts if h.source == "llm"]
The LLM is strictly additive: with no model, or on any model error, you still
get the full deterministic plan — suggest() never raises. Add a query language
by implementing the Dialect protocol (key, label, supports, render).
Layer 5 — response / blocking
Take the indicators the report flagged malicious and block them at the control points you operate. Blocking is dry-run by default — you must explicitly opt into live changes:
pip install "iocflow[block]"
from iocflow import extract
from iocflow.enrich import enrich
from iocflow.block import block, unblock
entities = extract(report_text)
report = enrich(entities)
plan = block(report) # DRY RUN — shows exactly what would be blocked
print(plan.summary())
# DRY RUN: 1 skipped, 6 dry_run
result = block(report, dry_run=False) # actually push the blocks
unblock(report, dry_run=False) # reverse them
Targets, each acting only on the kinds it can enforce:
- Palo Alto —
PanEdlFeedmaintains typedip/domain/urlExternal Dynamic List files your firewall pulls (decoupled, non-destructive), andPanOsBlockerregisters IP tags live via the User-ID API for a Dynamic Address Group deny policy. - Zscaler ZIA —
ZscalerBlockeradds URLs/domains to the denylist and activates the change. - CrowdStrike Falcon —
CrowdStrikeBlockercreates custom IOCs (md5/sha256/domain/ip) with apreventaction via the IOC Management API. - Abnormal Security —
AbnormalBlockerblocks email senders (experimental).
Safety is the point of this layer and it's authoritative:
- Dry-run by default. Nothing changes unless you pass
dry_run=False. - An allowlist guard vetoes benign and internal indicators — public
resolvers, private/internal IPs, well-known domains — before any target is
called, even if a report mislabeled one as malicious. You cannot accidentally
block
8.8.8.8. - Malicious-only by default (
min_verdict="suspicious"to widen), keyless targets are skipped, and a failing target becomes aFAILEDresult rather than crashing the batch. Every result carries the exact payload sent, so a dry run is a full audit.
Set credentials via the environment (IOCFLOW_PANOS_*, IOCFLOW_ZSCALER_*,
IOCFLOW_FALCON_*, IOCFLOW_PAN_EDL_PATH, IOCFLOW_ABNORMAL_API_TOKEN) and
default_blockers() builds every configured target, or pass blockers explicitly.
Bring your own control point by implementing the Blocker protocol
(name, supports, block, unblock).
Layer 6 — the agentic capstone
Hand a report to a small multi-agent team and let it run the whole lifecycle: a supervisor routes to specialist agents (extractor → enricher → hunter → responder) that use Layers 1–5 as tools. The LLM applies judgment; the deterministic layers do the exact work and are the fallback.
(Run it yourself: examples/demo_investigate.py.)
pip install "iocflow[agent]" # Python 3.10+ (LangGraph / LangChain)
from iocflow.agent import investigate
case = investigate(report_text) # safe: nothing is blocked by default
print(case.summary())
print(case.commentary.severity.value, "—", case.commentary.summary)
for line in case.trace: # the agents' reasoning trace
print(" •", line)
The model is any LangChain chat model; default_agent_model() builds a
FailoverChatModel (primary→secondary, via
langchain-failover) from the
same IOCFLOW_LLM_* env. With no model configured, the graph runs the layers in
a fixed deterministic order — so it always produces a Case.
Blocking is human-in-the-loop, with three-layer authority. The responder
agent proposes blocks, an ApprovalGate lets a human authorize them, and the
Layer 5 allowlist guard vetoes benign/internal indicators underneath — the LLM
is never the sole authority for a destructive action. The default is
DenyAllGate (an unattended run blocks nothing); pass an approving gate to act:
from iocflow.agent import investigate, CLIApprovalGate
case = investigate(report_text, gate=CLIApprovalGate()) # prompts before blocking
AutoApproveGate (dev/CI) and CLIApprovalGate (plan-level or per-action) ship
in the box, and so does a real chat gate — SlackApprovalGate posts the
proposed blocks to a channel and waits for an allowlisted approver to react,
defaulting to deny on timeout (no inbound webhook server required):
from iocflow.agent import investigate
from iocflow.agent.chat_gate import SlackApprovalGate
# SLACK_BOT_TOKEN + SLACK_APPROVAL_CHANNEL from the env; only these users count
gate = SlackApprovalGate(approvers=["U_ANALYST"], timeout=600)
case = investigate(report_text, gate=gate) # ✅ to authorize, ❌ or no reply = denied
ChatApprovalGate + a two-method ChatTransport (post, reactions) make the
same flow portable to Webex, Teams, or anything else — implement the
ApprovalGate protocol to wire any channel you like. The threat-intel sources
(enrichers=) and block targets (blockers=) are equally pluggable, so the
agent runs fully offline in tests. The lifecycle is also exposed as LangChain
tools (IOCFLOW_TOOLS) for your own agents.
Sources — trigger the lifecycle automatically
Everything above starts from text you hand in. Sources answer the other half:
where does that text come from? A Source polls a feed and yields Trigger
work items; a Poller de-duplicates them against a SeenStore and runs a
handler — by default the deterministic extract → enrich → comment → suggest
lifecycle. It's the same shape as a real critical-advisory poller, as a library.
pip install "iocflow[sources]"
from iocflow.sources import Poller, SqliteSeenStore, GitHubAdvisorySource
poller = Poller(
[GitHubAdvisorySource(severities=["critical"])],
store=SqliteSeenStore("advisories.sqlite"), # durable: survives restarts
)
for result in poller.run_once(): # call from cron / a systemd timer
print(result.output.summary())
Reference sources ship for GitHub Security Advisories, any RSS/Atom feed
(vendor advisories, threat blogs), and a watched directory of files;
default_sources() builds them from the environment. Scheduling stays yours —
the library offers run_once() and a simple run_forever(interval), so it drops
behind your own cron or systemd timer.
Crucially, a poller never blocks anything: the default handler only analyzes
and (with the agent layer) proposes. To close the loop, hand the trigger to
investigate() with an approval gate — feed → investigate → propose → a human
approves in Slack — so automation does the toil and a person still holds the
trigger on anything destructive. See
examples/poller_advisories.py.
STIX interop — the threat-intel lingua franca
iocflow speaks STIX 2.1 both ways, so it drops into an existing TIP / TAXII pipeline rather than living on an island.
pip install "iocflow[stix]"
from iocflow.stix import from_stix, to_stix
entities = from_stix(bundle) # STIX bundle/objects/JSON → extracted indicators
out = to_stix(enrichment_report) # any iocflow result → a conformant STIX 2.1 bundle
from_stix walks observable objects and indicator patterns and is resilient to
the messy bundles real feeds emit (a bad object is skipped, never fatal).
to_stix accepts entities, an EnrichmentReport (verdicts become
indicator_types / confidence), a Case, or plain (kind, value) pairs, and
gives every object a deterministic id (UUIDv5 over the indicator) so bundles
are reproducible and idempotent to re-ingest. Both are stdlib-only.
A TAXII 2.1 collection is also an ingestion source — it plugs straight into the poller from the previous section:
from iocflow.stix import TaxiiSource
from iocflow.sources import Poller, SqliteSeenStore
poller = Poller(
[TaxiiSource(api_root, collection_id, token="…")],
store=SqliteSeenStore("taxii.sqlite"),
)
Where this is going
iocflow grows in independently-useful layers, each behind its own pip extra.
Layers 1–6 all ship today — extraction, enrichment, AI commentary, suggested
hunts, response/blocking, and the agentic capstone. The pipeline is a clean
hand-off chain of stable types: ExtractedEntities (L1) → enrich() →
EnrichmentReport (L2) → comment() → Commentary (L3) → suggest() →
HuntPlan (L4) → block() → BlockReport (L5) — and investigate() (L6)
orchestrates the whole chain as a multi-agent team with a human-in-the-loop gate.
Everything but the agent capstone runs on Python 3.9+; import iocflow stays
dependency-light (one dependency) and pulls in no layer you don't ask for.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file iocflow-0.8.0.tar.gz.
File metadata
- Download URL: iocflow-0.8.0.tar.gz
- Upload date:
- Size: 210.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
46b5fd406da92c8e3290598f6f19b52854e80f601f7cffee87cf91e3df1307a4
|
|
| MD5 |
fbbccba0dcd2d1a6835027088e08f38c
|
|
| BLAKE2b-256 |
efe2adefa1b9a97b56010712516aa583869629d7e10330050c2abda008b0b4a3
|
Provenance
The following attestation bundles were made for iocflow-0.8.0.tar.gz:
Publisher:
release.yml on vinayvobbili/iocflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
iocflow-0.8.0.tar.gz -
Subject digest:
46b5fd406da92c8e3290598f6f19b52854e80f601f7cffee87cf91e3df1307a4 - Sigstore transparency entry: 1687689303
- Sigstore integration time:
-
Permalink:
vinayvobbili/iocflow@53b0ed382b0abab4f7b8fdca78c919a325de3067 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/vinayvobbili
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@53b0ed382b0abab4f7b8fdca78c919a325de3067 -
Trigger Event:
push
-
Statement type:
File details
Details for the file iocflow-0.8.0-py3-none-any.whl.
File metadata
- Download URL: iocflow-0.8.0-py3-none-any.whl
- Upload date:
- Size: 111.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
21e3d6ca4539d181560a88d2f562369832f4857bcba2fd1f9e3b9728098f5384
|
|
| MD5 |
a9825c35606a091939a2e24b4fb357f7
|
|
| BLAKE2b-256 |
325f12c50cefdb8d8616adf64af8c2ef88b1e53b6857db61d836e94e607e2fe2
|
Provenance
The following attestation bundles were made for iocflow-0.8.0-py3-none-any.whl:
Publisher:
release.yml on vinayvobbili/iocflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
iocflow-0.8.0-py3-none-any.whl -
Subject digest:
21e3d6ca4539d181560a88d2f562369832f4857bcba2fd1f9e3b9728098f5384 - Sigstore transparency entry: 1687689441
- Sigstore integration time:
-
Permalink:
vinayvobbili/iocflow@53b0ed382b0abab4f7b8fdca78c919a325de3067 -
Branch / Tag:
refs/tags/v0.8.0 - Owner: https://github.com/vinayvobbili
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@53b0ed382b0abab4f7b8fdca78c919a325de3067 -
Trigger Event:
push
-
Statement type: