Python library - modules for processing data from the TI, ASM and DRP system collected in one library. This library simplifies work with the products API and gives you the flexibility to customize the search and retrieval of data from the system.
Project description
ciaops
ciaops - Python library to communicate with Company Products (TI, DRP, ASM) via API.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Content
- ciaops
Installation
Lib deps: requests, urllib3.
ciaops lib is available on PyPI:
pip install ciaops
Or use a Portal WHL archive. Replace X.X.X with current lib version:
pip install ./ciaops-X.X.X-py3-none-any.whl
Usage
Initialization
Initialize a poller with your credentials. TLS certificate verification is enabled by default.
Call set_verify() only when you need to override the default: pass False to disable verification (e.g. local testing behind a corporate proxy), or pass a path to a custom CA bundle.
from ciaops import TIPoller, DRPPoller, ASMPoller
# Threat Intelligence
ti = TIPoller(username='example@example.corp', api_key='API_KEY', api_url='TI_API_URL')
# Digital Risk Protection
drp = DRPPoller(username='example@example.corp', api_key='API_KEY', api_url='DRP_API_URL')
# Attack Surface Management
asm = ASMPoller(username='example@example.corp', api_key='API_KEY', api_url='ASM_API_URL')
# Override only when needed:
ti.set_verify(False) # disable TLS check (local testing only)
ti.set_verify('/path/to/ca-bundle') # custom CA bundle
Proxy setup (all pollers share the same interface):
ti.set_proxies(
proxy_protocol='https',
proxy_ip='10.0.0.1',
proxy_port='3128',
proxy_username='user', # optional
proxy_password='secret', # optional
)
Use pollers as context managers to ensure the session is always closed:
with TIPoller(username='...', api_key='...', api_url='...') as ti:
generator = ti.create_update_generator('apt/threat', sequpdate=16172928022293)
for portion in generator:
print(portion.parse_portion())
Tag your integration in the User-Agent header with set_product() — required by some on-premise deployments:
ti.set_product(
product_type='SIEM',
product_name='MySIEM',
product_version='2.1',
integration_name='ciaops-siem',
integration_version='1.0',
)
Collection constants
Use typed constants instead of bare strings to avoid typos and get IDE autocomplete:
from ciaops.collections_meta import TICollections, DRPCollections, ASMCollections
# TI examples
TICollections.APT_THREAT # "apt/threat"
TICollections.COMPROMISED_ACCOUNT_GROUP # "compromised/account_group"
TICollections.MALWARE_CNC # "malware/cnc"
# DRP
DRPCollections.VIOLATION # "violation"
DRPCollections.COMPROMISED_DARKWEB # "compromised/darkweb"
# ASM
ASMCollections.ASSETS_UPDATED # "assets/updated"
ASMCollections.ISSUES_UPDATED # "issues/updated"
Look up the recommended TTL for any TI collection:
TICollections.get_ttl(TICollections.MALWARE_CNC) # 90 (days)
TICollections.get_ttl(TICollections.COMPROMISED_MESSENGER) # None (no expiry)
Use constants anywhere a collection name string is expected:
poller.set_keys(TICollections.APT_THREAT, keys)
generator = poller.create_update_generator(TICollections.MALWARE_CNC, sequpdate=...)
Collections mapping
Method set_keys() sets keys to search in the selected collection. It should be python dict mapping_keys = {key: value} where
key - result name
value - dot-notation string with searchable keys
mapping_keys = {"result_name": "searchable_key_1.searchable_key_2"}
Parser finds keys recursively in the API response, using dot-notation in value.
If you want to add your own data to the results start the value with star *.
mapping_keys = {
"network": "indicators.params.ip",
"result_name": "*My_Value"
}
For set_keys() or set_iocs_keys() methods you can make a full template to get nested data in the way you want.
mapping_keys = {
'network': {
'ips': 'indicators.params.ip'
},
'url': 'indicators.params.url',
'type': '*network'
}
poller.set_keys(collection_name="apt/threat", keys=mapping_keys)
poller.set_iocs_keys(collection_name="apt/threat", keys={"ips": "indicators.params.ip"})
Portions generator
Use the next methods create_update_generator(), create_search_generator() to create a generator, which return portions of limited feeds.
Update generator - goes through the feeds in ascending order. Feeds iteration based on seqUpdate field.
Search generator - goes through the feeds in descending order. Feeds iteration based on resultId field.
Note: Update generator iterates over all collections excluding compromised/breached and compromised/reaper.
Sequence update logic is not applied to these collections.
generator = poller.create_update_generator(
collection_name='compromised/account_group',
date_from='2021-01-30',
date_to='2021-02-03',
query='8.8.8.8',
sequpdate=20000000,
limit=200
)
Each portion (iterable object) presented as Parser class object.
You can get raw data (in json format) or parsed portion (python dictionary format),
using its methods and attributes.
for portion in generator:
parsed_json = portion.parse_portion(as_json=False)
iocs = portion.get_iocs(as_json=False)
sequpdate = portion.sequpdate
count = portion.count
raw_json = portion.raw_json
raw_dict = portion.raw_dict
new_parsed_json = portion.bulk_parse_portion(keys_list=[{"ips": "indicators.params.ip"}, {"url": 'indicators.params.url'}], as_json=False)
Attribute sequpdate of the generator iterable object, gives you the last sequence update number (seqUpdate)
of the feed, which you can save locally.
sequpdate = portion.sequpdate
Attribute count of the generator iterable object, shows you the number of feeds left. This amount still in the queue.
For Search generator count will return total number of feeds in the queue.
count = portion.count
Methods parse_portion() and get_iocs() of generator iterable objects, use your
mapping keys (IoCs keys) to return parsed data.
You can override mapping keys using keys parameter in these functions.
parsed_json = portion.parse_portion(as_json=False)
iocs = portion.get_iocs(as_json=False, keys=mapping_override_keys)
Also, you can use bulk_parse_portion() method to get multiple parsed dicts from every feed.
new_parsed_json = portion.bulk_parse_portion(keys_list=[{"ips": "indicators.params.ip"}, {"url": 'indicators.params.url'}], as_json=False)
TIPoller extra methods
Available collections
Call get_available_collections() to discover which collections your API key can access before iterating.
collection_list = ti.get_available_collections()
seq_update_dict = ti.get_seq_update_dict(date=’2020-12-12’)
compromised_account_sequpdate = seq_update_dict.get(‘compromised/account_group’)
# Check which collections have active hunting rules applied
hunting_collections = ti.get_hunting_rules_collections()
Find feed by ID
Returns a Parser object for a single feed by its ID.
feed = ti.search_feed_by_id(collection_name=’apt/threat’, feed_id=’abc123’)
parsed = feed.parse_portion()
Download file
Download a binary file embedded inside a threat report.
binary = ti.search_file_in_threats(
collection_name=’hi/threat’,
feed_id=’feed_id’,
file_id=’file_id_inside_feed’,
)
Download PDF reports
# Download a PDF for an HI or APT threat
pdf_bytes = ti.download_threat_pdf(threat_id=’abc123’)
# Download an HI analytic report PDF (use file.name field from hi/analytic record)
pdf_bytes = ti.download_analytic_report_pdf(file_name=’/23ae4ab7.../file/450ffbd4...’)
with open(‘report.pdf’, ‘wb’) as f:
f.write(pdf_bytes)
IP scoring
Score one or more IPs against the TI database.
# Single IP
result = ti.scoring(‘8.8.8.8’)
# → {"items": {"8.8.8.8": {"score": 7.5, ...}}}
# Multiple IPs
result = ti.scoring([‘8.8.8.8’, ‘1.1.1.1’])
MITRE ATT&CK
Fetch the full MITRE ATT&CK technique vocabulary or a ready-to-use ID→name map.
# Raw vocabulary (includes all AttackPattern details)
vocab = ti.get_mitre_techniques()
# Convenient ID → name dict
mitre_map = ti.get_mitre_attack_pattern_map()
# → {"T1059": "Command and Scripting Interpreter", "T1078": "Valid Accounts", ...}
technique_name = mitre_map.get("T1059")
Global search
Search across all TI collections by query string.
results = ti.global_search(‘8.8.8.8’)
# → [{"apiPath": "suspicious_ip/scanner", "count": 14, ...}, ...]
DRPPoller methods
from ciaops import DRPPoller
drp = DRPPoller(username=’...’, api_key=’...’, api_url=’DRP_URL’)
Update generator — iterate violation feeds:
generator = drp.create_update_generator(
collection_name=’violation’,
sequpdate=1700000000000000,
subtypes=[6], # 1=counterfeit 2=piracy 3=partner_policy 4=trademark 5=malware 6=phishing 7=fraud
section=[1, 2], # 1=Web 2=Mobile 3=Marketplace 4=Social 5=Advertising 6=Messengers
brands=[‘brand_id’],
approve_states=[‘under_review’],
)
for portion in generator:
data = portion.parse_portion()
Find feed by ID:
feed = drp.search_feed_by_id(feed_id=’violation_id’)
raw = feed.raw_dict
Change violation status (only when status=detected and approveState=under_review):
drp.change_status(feed_id=’violation_id’, status=’approve’) # or ‘reject’
Brands and subscriptions:
brands = drp.get_brands()
# → [{"name": "Brand A", "id": "id1"}, ...]
subscriptions = drp.get_subscriptions()
# → ["scam", "phishing", ...]
Typo-squatting scan (iterates from the very beginning):
generator = drp.create_update_generator(
collection_name=’violation’,
use_typo_squatting=True,
)
seqUpdate by date:
seq_dict = drp.get_seq_update_dict(date=’2024-01-15’)
# → {"violation": 1705276800000000, ...}
ASMPoller methods
from ciaops import ASMPoller
asm = ASMPoller(username=’...’, api_key=’...’, api_url=’ASM_URL’)
List companies:
companies = asm.get_companies() # all companies
active = asm.get_companies(status=’active’)
# → [{"id": "uuid", "name": "Acme Corp"}, ...]
Update generator — uses POST requests with automatic rate limiting:
generator = asm.create_update_generator(
collection_name=’assets/updated’, # or ‘leaks/updated’, ‘issues/updated’
company_id=’company-uuid’, # or list of UUIDs
date_from=’2024-01-01’,
date_to=’2024-06-01’, # optional
count=500, # max 5000
status=[‘new’, ‘confirmed’], # optional filter
type=[‘domain’, ‘ip’], # optional filter (assets only)
)
for portion in generator:
data = portion.parse_portion()
Dashboard scores:
scores = asm.get_dashboard_scores(company_id=’company-uuid’)
print(scores.current_score) # 7.4
print(scores.score_trend) # "improving" | "declining" | "stable"
print(scores.total_critical) # 3
print(scores.severity_summary) # {"critical": 3, "high": 12, ...}
print(scores.counters_summary) # {"new_assets": 5, "new_issues": 2, ...}
print(scores.lowest_scoring_category) # {"name": "Network Security", ...}
summary = scores.get_dashboard_summary() # full dict for periodic updates
raw = asm.get_dashboard_scores(company_id=’uuid’, as_raw=True) # plain dict
Issue management:
evidence = asm.get_issue_evidence(issue_id=’issue-uuid’)
asm.add_issue_comment(
company_id=’company-uuid’,
issue_id=’issue-uuid’,
body=’Investigating...’,
)
asm.change_issue_status(
issues_id=[‘issue-uuid-1’, ‘issue-uuid-2’],
status=’Under review’, # Detected | Under review | Solved | Ignored | False positive
)
Asset management:
asm.add_assets(
company_id=’company-uuid’,
confirmed_domain=[‘example.corp’],
confirmed_ip=[‘1.2.3.4’],
)
asm.remove_assets(
company_id=’company-uuid’,
excluded_domain=[‘old.group-ib.com’],
)
asm.change_asset_status(
assets_ids=[‘asset-uuid’],
status=’confirmed’, # new | false | confirmed
)
Close session
Always close the session in a try…finally block, or use the context manager:
from ciaops import TIPoller
from ciaops.exception import InputException, ConnectionException
try:
poller = TIPoller(username=’example@group-ib.com’, api_key=’API_KEY’, api_url=’API_URL’)
# ... do work ...
except InputException as e:
logger.error("Wrong input: %s", e)
except ConnectionException as e:
logger.error("Connection error: %s", e)
finally:
poller.close_session()
Parsing
Common example of API response from Collection (received feeds):
api_response = [
{
'iocs': {
'network': [
{
'ip': [1, 2],
'url': 'url.com'
},
{
'ip': [3],
'url': ''
}
]
}
},
{
'iocs': {
'network': [
{
'ip': [4, 5],
'url': 'new_url.com'
}
]
}
}
]
Parse portion method
Your mapping dict for parse_portion() or bulk_parse_portion() methods:
mapping_keys = {
'network': {'ips': 'iocs.network.ip'},
'url': 'iocs.network.url',
'type': '*custom_network'
}
Result of parse_portion() output:
parsing_result = [
{
'network': {'ips': [[1, 2], [3]]},
'url': ['url.com', ''],
'type': 'custom_network'
},
{
'network': {'ips': [[4, 5]]},
'url': ['new_url.com'],
'type': 'custom_network'
}
]
Result of bulk_parse_portion() output:
parsing_result = [
[
{
'network': {'ips': [[1, 2], [3]]},
'url': ['url.com', ''],
'type': 'custom_network'}
],
[
{
'network': {'ips': [[4, 5]]},
'url': ['new_url.com'],
'type': 'custom_network'}
]
]
Get IoCs method
Your mapping dict for get_iocs() method:
mapping_keys = {
'ips': 'iocs.network.ip',
'url': 'iocs.network.url'
}
Result of get_iocs() output:
parsing_result = {
'ips': [1, 2, 3, 4, 5],
'url': ['url.com', 'new_url.com']
}
Utilities
ParserHelper and Validator are standalone utilities available for use outside the generator flow — for example, when post-processing raw API responses or building custom pipelines on top of the library.
from ciaops.utils import ParserHelper, Validator
ParserHelper
find_by_template
Parse a single feed dict against a key-mapping template. Returns a dict with the resolved values.
feed = {
"id": "abc123",
"evaluation": {"severity": "high"},
"indicators": [{"params": {"ip": "1.2.3.4"}}, {"params": {"ip": "5.6.7.8"}}]
}
keys = {
"feed_id": "id",
"severity": "evaluation.severity",
"ips": "indicators.params.ip",
"source": "*Group-IB",
}
result = ParserHelper.find_by_template(feed, keys)
# {
# "feed_id": "abc123",
# "severity": "high",
# "ips": ["1.2.3.4", "5.6.7.8"],
# "source": "Group-IB",
# }
Supported value directives:
| Directive | Example | Result |
|---|---|---|
| Dot-path string | "evaluation.severity" |
Value at that path |
"*literal" (star prefix) |
"*Group-IB" |
The literal string "Group-IB" |
"#field[N]" (hash prefix) |
"#items[0]" |
Element at index N of the list found at field |
| Nested dict | {"ips": "indicators.params.ip"} |
Recursive template application |
{"__nested_dot_path_to_list": "path", ...} |
— | Maps the inner template over each item in the list at path |
{"__concatenate": {"static": "https://portal/?id=", "dynamic": "id"}} |
— | Concatenates a static prefix with a dynamic field value |
{"__concatenate": {"collection": "apt/threat", "dynamic": "id"}} |
— | Prefix auto-resolved from portal links for the given collection |
{"__concatenate": {"parts": ["*https://portal/", "category", "*-", "id"]}} |
— | Multi-part concatenation: * marks literals, bare strings are field paths |
Optional kwargs:
use_join_to_end_list=True— joins list values into a single comma-separated string.except_keys=["field"]— excludes specific keys from the joining above.
find_element_by_key
Traverse any dict or list using a dot-notation path. Safe for nested lists and missing keys.
from ciaops.utils import find_element_by_key
find_element_by_key({"a": {"b": 1}}, "a.b")
# → 1
find_element_by_key({"items": [{"ip": "1.2.3.4"}, {"ip": "5.6.7.8"}]}, "items.ip")
# → ["1.2.3.4", "5.6.7.8"]
find_element_by_key({"a": None}, "a.b")
# → None
unpack_iocs
Recursively flattens a nested list of IoC values into a single deduplicated list. Filters out noise values ("", None, "0.0.0.0", "255.255.255.255").
raw = [["1.2.3.4", "5.6.7.8"], ["1.2.3.4", None, "0.0.0.0"]]
ParserHelper.unpack_iocs(raw)
# → ["1.2.3.4", "5.6.7.8"]
Validator
Validator guards against invalid inputs before they reach the API.
from ciaops.utils import Validator
validate_collection_name(collection_name, method=None) — raises InputException for unknown, deprecated, or removed collection names. When method="update" also rejects search-only collections.
Validator.validate_collection_name("apt/threat", method="update") # OK
Validator.validate_collection_name("attacks/phishing") # raises InputException: deprecated, use attacks/phishing_group
Validator.validate_collection_name("bp/phishing") # raises InputException: removed
validate_date_format(date, formats) — raises InputException if the date string does not match any of the provided format strings.
Validator.validate_date_format("2024-01-15", ("%Y-%m-%d",)) # OK
Validator.validate_date_format("15/01/2024", ("%Y-%m-%d",)) # raises InputException
validate_ips_argument(ips) — normalizes and validates the ips argument for the scoring endpoint. Accepts a single IP string or a list of IP strings; raises InvalidIpsParameter on invalid input. Returns a normalized list.
Validator.validate_ips_argument("8.8.8.8") # → ["8.8.8.8"]
Validator.validate_ips_argument(["8.8.8.8", "1.1.1.1"]) # → ["8.8.8.8", "1.1.1.1"]
Validator.validate_ips_argument("8.8.8.8,1.1.1.1") # raises InvalidIpsParameter
Portal Links
PORTAL_LINKS and generate_portal_link map collection records to their Group-IB Portal URLs.
from ciaops import PORTAL_LINKS, generate_portal_link
Simple collections (single ID in URL)
Most collections use a plain prefix + record ID pattern:
link = generate_portal_link('apt/threat', record_id='abc123')
# → "https://tap.group-ib.com/ta/last-threats?threat=abc123"
link = generate_portal_link('malware/config', record_id='def456')
# → "https://tap.group-ib.com/malware/configs?id=def456"
link = generate_portal_link('compromised/account_group', record_id='ghi789')
# → "https://tap.group-ib.com/cd/accounts?id=ghi789"
Returns None when record_id is empty or the collection has no portal mapping.
Multi-part URL templates
Some collections require multiple fields from the feed record (e.g. compromised/messenger, compromised/discord). Pass all required field values via the fields dict:
link = generate_portal_link(
'compromised/messenger',
fields={'chatStat.id': '1234', 'id': '5678'},
)
# → "https://tap.group-ib.com/ta/im?chatId=1234&msg=5678"
link = generate_portal_link(
'compromised/discord',
fields={'channel.id': 'ch99', 'id': 'msg42'},
)
# → "https://tap.group-ib.com/ta/im?collection=discord&chatId=ch99&msg=msg42"
Returns None if any required field is missing or empty.
Custom prefix override
link = generate_portal_link('my/collection', record_id='001', url_prefix='https://tap.group-ib.com/feed?id=')
# → "https://tap.group-ib.com/feed?id=001"
Embedding portal links in parsed output
Use the __concatenate directive in your mapping template so that ParserHelper resolves the URL automatically during parsing:
keys = {
'id': 'id',
'title': 'title',
'portal_url': {'__concatenate': {'collection': 'apt/threat', 'dynamic': 'id'}},
}
result = ParserHelper.find_by_template(feed, keys)
# result['portal_url'] → "https://tap.group-ib.com/ta/last-threats?threat=<id>"
Inspecting the full URL map
from ciaops import PORTAL_LINKS
for collection, template in PORTAL_LINKS.items():
print(collection, '->', template)
Adapter utilities
ConfigParser and FileHandler are used by file-config based adapters such as the MISP adapter. They are not required for standard TI/DRP/ASM polling.
from ciaops.adapters.misp_utils import ConfigParser, FileHandler
Note:
ConfigParserandFileHandlerare also re-exported fromciaops.utilsfor backward compatibility, but the canonical import is fromciaops.adapters.misp_utils.
ConfigParser
Parses YAML and JSON config files used by MISP-style adapters.
cp = ConfigParser()
# Extract credentials from a YAML config dict as a dynamic Enum
creds = cp.get_creds(yaml_config) # reads yaml_config["creds"]
creds = cp.get_creds(yaml_config, key="auth") # custom key
creds.USERNAME.value # "user@example.corp"
creds.API_KEY.value # "abc123"
creds.API_URL.value # "https://..."
# Get only enabled / disabled collections from YAML config
enabled = ConfigParser.get_enabled_collections(yaml_config) # ["apt/threat", ...]
disabled = ConfigParser.get_disabled_collections(yaml_config)
# Read a single collection's default_date
date = ConfigParser.get_collection_default_date(yaml_config, "apt/threat")
FileHandler
A Borg-singleton file handler for reading and writing YAML and JSON config files. All instances share the same internal state, providing safe concurrent access via an in_progress flag.
fh = FileHandler()
# Check file existence / emptiness
fh.is_exist("/path/to/config.yml") # True / False
fh.is_empty("/path/to/config.yml") # True / False
# Read configs
yaml_config = fh.read_yaml_config("/path/to/config.yml")
json_config = fh.read_json_config("/path/to/mapping.json")
# Persist updated collection state back to YAML
fh.save_collection_info(
config="/path/to/config.yml",
collection="apt/threat",
seqUpdate=16172928022293,
default_date="2024-01-15",
)
# Overwrite an entire config file
fh.save_data_to_yaml_config(data, "/path/to/config.yml")
fh.save_data_to_json_config(data, "/path/to/mapping.json")
Examples
Full version of program
import logging
from ciaops import TIPoller
from ciaops.exception import InputException, ConnectionException, ParserException
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
...
try:
poller = TIPoller(username=username, api_key=api_key, api_url=api_url)
poller.set_proxies(proxy_protocol=PROXY_PROTOCOL,
proxy_port=PROXY_PORT,
proxy_ip=PROXY_ADDRESS,
proxy_password=PROXY_PASSWORD,
proxy_username=PROXY_USERNAME)
poller.set_verify(True)
for collection, keys in keys_config.items():
poller.set_keys(collection, keys)
for collection, state in update_generator_config.items():
if state.get("sequpdate"):
generator = poller.create_update_generator(collection_name=collection, sequpdate=state.get("sequpdate"))
elif state.get("date_from"):
sequpdate = poller.get_seq_update_dict(date=state.get('date_from'), collection_name=collection).get(collection)
generator = poller.create_update_generator(collection_name=collection, sequpdate=sequpdate)
else:
continue
for portion in generator:
parsed_portion = portion.parse_portion()
save_portion(parsed_portion)
update_generator_config[collection]["sequpdate"] = portion.sequpdate
except InputException as e:
logging.exception("Wrong input: {0}".format(e))
except ConnectionException as e:
logging.exception("Something wrong with connection: {0}".format(e))
except ParserException as e:
logging.exception("Exception occured during parsing: {0}".format(e))
finally:
poller.close_session()
API logic
To iterate over received portions from API response, you should follow one of the next iteration logic:
- Result ID iteration - based on
resultIdparameter, which was retrieved from previous response. Uses common collection name endpoint (apt/threat) which is added to the base URL >>>/api/v2/apt/threat. - Sequence update iteration - based on
seqUpdateparameter, which was retrieved from previous response. Uses updated endpoint (/updated) after collection name (/apt/threat) >>>/api/v2/apt/threat/updated.
To search IPs, domains, hashes, emails, etc., you should follow the next logic:
- Search logic -
First you should reach
/api/v2/searchendpoint with anyqparameter >>>/api/v2/search?q=8.8.8.8. In the output response you will receive collections, which contains the search result (8.8.8.8). Use Sequence update iteration as a next step to retrieve all events.
To get the latest updates on each collection events you should follow the next logic:
- Sequence update logic -
first you should reach
/api/v2/sequnce_listendpoint withdateandcollectionparameters (optional) >>>/api/v2/search?date=2022-01-01&collection=apt/threat. In the output response you will receiveseqUpdatenumber, which you should use in the next request to collection/updatedendpoint. Use Sequence update iteration as a next step to retrieve all events.
Sequence update logic
Most of the collections at the Threat Intelligence portal has /updated endpoint.
And this endpoint uses updated logic based on seqUpdate key field, which comes from API JSON response.
The seqUpdate key – is a time from Epoch converted to a big number (microseconds), using the next formula:
UTC timestamp * 1000 * 1000.
Note: Don't rely on this formula. Because of the rising amount of data it could be changed.
For that purpose /api/v2/sequence_list endpoint was created.
Use this endpoint to get required seqUpdate number.
API response
Each row in our database has its own unique sequence update number. So, we can get all the events one by one.
To check it you can explore JSON output and then explore each item in the "items" field.
So, each item contains a seqUpdate field. And the last element’s seqUpdate is put to the top level of JSON output.
You can use it to get the next portion of feeds.
Each collection has its own updated route like /api/v2/apt/threat/updated, so we can use the next output as an example.
{
"count": 1761,
"items": [
{"id": "fake286ca753feed3476649438e4e4488"...},
{"id": "fake51d29357b22b80564a1d2f9fc8751"...},
{
"author": null,
"companyId": [],
"id": "fake4f16300296d20ef9b909dc0d354fb",
......,
"indicators": [
{
"dateFirstSeen": null,
"dateLastSeen": null,
"deleted": false,
"description": null,
"domain": "example.corp",
"id": "fakebe483bb82759fbee7038235e0f52d0",
.....
}
],
"indicatorsIds": [
"fakebe483bb82759fbee7038235e0f52d0"
],
"isPublished": true,
"isTailored": false,
"labels": [],
"langs": [
"en"
],
"malwareList": [],
......,
"seqUpdate": 16172928022293
},
],
"seqUpdate": 16172928022293
}
Iteration steps
To iterate over /api/v2/apt/threat/updated endpoint data, you need to collect this
field number ("seqUpdate": 16172928022293) right at the top level of the JSON response,
received from previous request or from /sequnce_list endpoint.
curl -X 'GET' 'https://<base URL>/api/v2/sequnce_list'
Add gathered seqUpdate in the next request, using endpoint params.
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=16172928022293'
In the received JSON output check the "count": 1751. ->
Gather seqUpdate from last feed or at top level ->
Put it in next request ->
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=16172928536227'
In the received JSON output, check the "count": 1741 ->
Gather seqUpdate from last feed or at top level ->
Repeat till the end.
Stop the iteration
The "stop word" in that logic is items "count" or "items" list length.
For the collection apt/threat in above example, the limit is set to 10 by default,
the other collections usually have 100 limit. The limit depends on the amount of data to not overload the JSON output.
For example, usually you receive a portion of 100 feeds (not 10) for the first iteration. ->
Then could be a portion of 23 feeds -> Then a portion of 0 feeds -> The end.
Search logic
Search logic is used to find attribution to the search value in Threat Intelligence database.
Global search
To find events related to IP, domain, hash, email, etc., you should send request to the /api/v2/search endpoint
with any q parameter (/api/v2/search?q=8.8.8.8).
It will return a list of collections, which contains this searchable parameter.
As a next step we need to use Sequence update iteration over all items in each collection.
You can specify the searchable type keyword to avoid side results by setting q parameter like /api/v2/search?q=ip:8.8.8.8.
The same can be done for domain, email, hash, etc (/api/v2/search?q=domain:example.corp, /api/v2/search?q=email:example@example.corp).
[
{
"apiPath": "suspicious_ip/open_proxy",
"label": "Suspicious IP :: Open Proxy",
"link": "https://<base-url>/api/v2/suspicious_ip/open_proxy?q=ip:8.8.8.8",
"count": 14,
"time": 0.304644684,
"detailedLinks": null
},
{
"apiPath": "attacks/ddos",
"label": "Attack :: DDoS",
"link": "https://<base-url>/api/v2/attacks/ddos?q=ip:8.8.8.8",
"count": 1490,
"time": 0.389418291,
"detailedLinks": null
},
{"apiPath": "attacks/deface"...},
{"apiPath": "malware/config"...},
{"apiPath": "suspicious_ip/scanner"...}
]
Iteration steps
On the first search step we receive information that collection attacks/ddos contains 1490 items ("count": 1490).
Let's extract all of them. First we need to send request to this collection with the q parameter (?q=ip:8.8.8.8).
Then we retrieve "seqUpdate" field right at the top level of the JSON response and use it in the next request ("seqUpdate": 1673373011294).
{
"count": 1490,
"items": [
{
"body": null,
"cnc": {"cnc": "http://example.corp/drv/"...},
"company": null,
"companyId": null,
"dateBegin": null,
"dateEnd": null,
"dateReg": "2017-08-16T00:00:00+00:00",
"evaluation": {},
"favouriteForCompanies": [],
"headers": [],
"hideForCompanies": [],
"id": "examplec58903baddc84b8c51eaef1f904374025d",
"isFavourite": false,
...
}
],
...,
"seqUpdate": 1673373011294
}
So the next request should look like this /api/v2/attacks/ddos/updated?q=ip:8.8.8.8&seqUpdate=1673373011294.
We can also set the limit parameter in the requests, like limit=500.
Explore the example below.
curl -X 'GET' 'https://<base URL>/api/v2/search?q=ip:8.8.8.8'
Add gathered seqUpdate in the next request, using endpoint params.
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=1673373011294'
In the received JSON output check the "count": 1390. ->
Gather seqUpdate from last feed or at top level ->
Put it in next request ->
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=1673375930599'
In the received JSON output, check the "count": 1290 ->
Gather seqUpdate from last feed or at top level ->
Repeat till the end.
Stop the iteration
The "stop word" in that logic is items "count" or "items" list length.
For the collection attacks/ddos in above example, the limit is set to 100 by default,
the other collections it may differ. The limit depends on the amount of data to not overload the JSON output.
For example, usually you receive a portion of 100 feeds for the first iteration. ->
Then could be a portion of 23 feeds -> Then a portion of 0 feeds -> The end.
Records limits
Default limit is 100 records per request. Due to different size of feeds there are different limits for getting data.
To change record limit in response add param limit=500 to the request.
All limits for different collections can be found at Portal documentation.
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?limit=500&seqUpdate=16172928022293'
Recommended TTL
TTL (Time To Live) is the maximum length of time an indicator or dataset (package) can exist. Calculated in days — during this period the platform guarantees that the data represents a valid, active IoC. Once the TTL expires the record should be considered stale and removed or re-evaluated. None means no expiry: the data does not have a defined lifetime and should be retained indefinitely.
| Endpoint | Recommended TTL (days) |
|---|---|
| Threat Intelligence | |
apt/threat_actor/updated |
360 |
hi/threat_actor/updated |
360 |
apt/threat/updated |
360 |
hi/threat/updated |
360 |
hi/open_threats/updated |
None |
hi/analytic/updated |
None |
| Malware | |
malware/config/updated |
30 |
malware/malware/updated |
None |
malware/signature/updated |
None |
malware/yara/updated |
None |
malware/cnc/updated |
90 |
| Attacks | |
attacks/phishing_kit/updated |
30 |
attacks/phishing_group/updated |
30 |
attacks/ddos/updated |
30 |
attacks/deface/updated |
30 |
| Vulnerabilities | |
osi/vulnerability/updated |
30 |
| Compromised | |
compromised/messenger/updated |
None |
compromised/discord/updated |
None |
compromised/access/updated |
90 |
compromised/account_group/updated |
90 |
compromised/breached/updated |
90 |
compromised/breacheddb/updated |
90 |
compromised/reaper/updated |
90 |
compromised/bank_card_group/updated |
90 |
compromised/masked_card/updated |
90 |
compromised/spd/updated |
90 |
| OSI | |
osi/public_leak/updated |
30 |
osi/git_repository/updated |
30 |
| Suspicious IP | |
suspicious_ip/tor_node/updated |
30 |
suspicious_ip/open_proxy/updated |
15 |
suspicious_ip/socks_proxy/updated |
2 |
suspicious_ip/vpn/updated |
30 |
suspicious_ip/scanner/updated |
15 |
| IoC | |
ioc/common/updated |
90 |
ioc/primary/updated |
90 |
Troubleshooting
401 response code
This code is return if you sent no credentials. Make sure that you send Authorization header and that you use Basic auth.
403 response code
There are several possible reasons of it:
- IP limitation. Make sure that you request from allowed IP address. You can find above how to set up your private IP list.
- API KEY issue. Make sure that your API KEY is active and valid. Try regeneration it as it was described above.
- No access to the feed. make sure that you have access to the requested feed. You can find available feed on Profile page -> Security and Access
504 response code or timeout
Try setting a smaller limit when requesting the API.
FAQ
Have a question? Ask in the SD Ticket on our Portal or integration@group-ib.com
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ciaops-1.0.1.tar.gz.
File metadata
- Download URL: ciaops-1.0.1.tar.gz
- Upload date:
- Size: 77.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5863eb3b0e3eda5cde609793a0ec533acb98e7006408ab7fb0369185ac591d9c
|
|
| MD5 |
9816941cf54ea17def49a4736e8dc8bd
|
|
| BLAKE2b-256 |
2f9b424d6b3d7f6655c10385247a5d5ed7215ea1d27a67474fad52e9cbbcc215
|
File details
Details for the file ciaops-1.0.1-py3-none-any.whl.
File metadata
- Download URL: ciaops-1.0.1-py3-none-any.whl
- Upload date:
- Size: 65.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
df842784009076b3dbaf05a8353da9f5a0622878fabe4d7dc084459f18a818b3
|
|
| MD5 |
c80d2970fa2fa22d8f2a42e3df1c577d
|
|
| BLAKE2b-256 |
edcffbe24cb8b06e68c0c657406cf62a47a7464ff33c63437bc4b543aae424b2
|