Skip to main content

Package provides poller utils for TI, DRP, ASM products

Project description

PyF6_utils

Python PyF6_utils PyF6

PyF6_utils – Python library to communicate with Products (TI, DRP, ASM) via API, based on PyF6 lib.

Content


Installation

Lib deps: pyf6, pyaml, requests, urllib3, dataclasses.

PyF6_utils lib is available on PyPI:

pip install pyf6_utils

Or use a Portal WHL archive. Replace X.X.X with the current lib version:

pip install ./pyf6_utils-X.X.X-py3-none-any.whl

Adapter

Define Config class

The next variables define the User-Agent mask with product metadata information:

PRODUCT_TYPE
PRODUCT_NAME
PRODUCT_VERSION
INTEGRATION
INTEGRATION_VERSION

Config variables responsible for two config files. CONFIG_YML defines each collection configuration. CONFIG_JSON defines mapping configuration to extract the necessary fields.

CONFIG_YML
CONFIG_JSON

Other variables field names are not necessary. As an example, explore the next Python code:

import os
import time

from pyf6 import TIAdapter, Logger
from pyf6_utils import FileHandler, ProxyConfigurator

from somewhere import SIEMCommunicator
from somewhere.exceptions import SomeSIEMapiError


class Config(object):
    # Set up product metadata
    PRODUCT_TYPE = "SCRIPT"
    PRODUCT_NAME = "FireWall"
    PRODUCT_VERSION = "unknown"
    INTEGRATION = "ADCB_FireWall"
    INTEGRATION_VERSION = "1.0.0"

    # Set project root dir
    ROOT_DIR = os.path.abspath(os.path.dirname(__file__))

    # Set basedirs
    DOCS_DIR = os.path.join(ROOT_DIR, "docs")
    LOGS_DIR = os.path.join(ROOT_DIR, "log")

    # Set up logging
    ROOT_LOGGING_LEVEL = 'DEBUG'
    LOGGING_FORMAT = '%(asctime)s [%(name)s: %(filename)s.%(lineno)s] [%(levelname)s] %(message)s'

    # Set up logs files
    LOGS_SESSION_FILENAME = 'session_ti.log'
    LOGS_INFO_FILENAME = 'info_ti.log'
    LOGS_WARNING_FILENAME = 'warning_ti.log'

    # Set up config filename
    _config_name_yml = 'endpoints_config.yaml'
    _config_name_json = 'mapping.json'

    # Set up configs
    CONFIG_YML = os.path.join(DOCS_DIR, "configs", _config_name_yml)
    CONFIG_JSON = os.path.join(DOCS_DIR, "configs", _config_name_json)


# Root logger initialization
Logger.init_root_logger(
    logs_dir=Config.LOGS_DIR,
    logging_format=Config.LOGGING_FORMAT,
    logging_level=Config.ROOT_LOGGING_LEVEL,
    session_filename=Config.LOGS_SESSION_FILENAME,
    info_filename=Config.LOGS_INFO_FILENAME,
    warning_filename=Config.LOGS_WARNING_FILENAME
)

# Logger instance to use
logger = Logger.init_logger(name=__name__)

cfg = Config
fh = FileHandler()
pc = ProxyConfigurator()

# Define configs
mapping_config = fh.read_json_config(config=Config.CONFIG_JSON)
endpoints_config = fh.read_yaml_config(config=Config.CONFIG_YML)

# Define creds
ti_creds = {
    "creds": {
        **endpoints_config.get("ti_client")
    }
}

some_siem_creds = {
    **endpoints_config.get("some_siem_creds")
}

proxy = {
    **endpoints_config.get("proxy")
}

# Proxy initialization
proxies = pc.get_proxies(**proxy)

# Adapter API initialization
ti_adapter = TIAdapter(
    ti_creds_dict=ti_creds,
    proxies=proxies,
    config_obj=cfg
)

# Some SIEM initialization
siem_communicator = SIEMCommunicator(**some_siem_creds)

Create config files

As an example, to correlate with Config class setting docs/config folder is used.

docs/config/mapping.json

{
    "apt/threat": {
        "ip": "indicators.params.ipv4",
        "md5": "indicators.params.hashes.md5",
        "sha1": "indicators.params.hashes.sha1",
        "sha256": "indicators.params.hashes.sha256",
        "url": "indicators.params.url",
        "domain": "indicators.params.domain"
    },
    "attacks/ddos": {
        "target_ip": "target.ipv4.ip",
        "ip": "cnc.ipv4.ip"
    }
}

docs/config/endpoints_config.yaml

collections:
  apt/threat:
    default_date: null
    description: A collection of Indicators and MITRE ATT&CK matrix. It contains HASH
      sums of malicious files that were generated by hackers, IP addresses, domains,
      CVE and the group's activities, motives, and goals to understand what tools
      and tactics they use according to the MITRE ATT&CK matrix.
    dtl: null
    enable: false
    seqUpdate: null
  attacks/ddos:
    default_date: null
    description: An attack that creates a load on the server and is executed simultaneously
      from a large number of computers (often a network of infected zombie computers
      is used) in order to create an artificial increase in requests to a resource
      and thereby disable it.
    dtl: null
    enable: false
    seqUpdate: null
some_siem_creds:
  base_ip: null
  password: null
  username: null
proxy:
  proxy_ip: null
  proxy_password: null
  proxy_port: null
  proxy_protocol: null
  proxy_username: null
ti_client:
  api_key: null
  username: null

Create generator

As a last step, create a generator and iterate other portions. Extract feeds from each portion and manipulate with them for your needs.

# Create generators based on configuration files
generators_list = ti_adapter.create_generators(sleep_amount=1)

# Iterate other
for collection, generator in generators_list:
    time.sleep(3)
    if not generator:
        logger.warning("No generator for collection: {}".format(collection))
        continue

    endpoints_config = fh.read_yaml_config(config=Config.CONFIG_YML)
    if not endpoints_config["collections"][collection]["enable"]:
        logger.warning("User disable collection: {}. Aborting!".format(collection))
        continue

    try:
        for portion in generator:
            # scan_blocks = portion.parse_portion(as_json=False)
            keys = mapping_config.get(collection, {})
            data = portion.get_iocs(keys)
            # data = portion.get_iocs(
            #     keys,
            #     filter_map=("objective", ["Card harvest", "Login harvest", "Malware drop", "PII harvest"])
            # )

            tag_name = collection.replace("/", "_")
            allowed_list = ["ip", "url"]

            # ADD ANY SIEM COMMUNICATOR LOGIC HERE

            prepared_data = {"seqUpdate": portion.sequpdate}
            fh.save_collection_info(
                config=Config.CONFIG_YML,
                collection=collection,
                **prepared_data
            )
    except SomeSIEMapiError as e:
        logger.exception("Error occurred during connection to SomeSIEM")
    except Exception as e:
        logger.exception("Generator is empty. Raising notification.")
        time.sleep(60)
        exit(0)

Pack the package

python setup.py sdist
python setup.py bdist_wheel
pip install wheel
python setup.py build
python setup.py install
python setup.py develop

Troubleshooting

301-302 Redirected

Make sure your public IP address is added to the Portal trusted IP list. If the address is not on the list, this error may occur.

400 Bad Request

Invalid request format. Check the JSON response to find the error and correct the request.

401 Unauthorized

Authorization failed. This error occurs when the request is missing credentials, or they were entered incorrectly. Make sure you are using Basic Authentication and have entered the correct username and API key. Double-check that the credentials are correct and are sent in the request header.

403 Forbidden

There are several possible reasons for it:

  • IP address restrictions: Make sure the request is coming from an allowed IP address. You can change the list of private IP addresses in your Personal Account settings.
  • API key issue: Make sure your API key is active. If necessary, create a new API key according to the instructions.
  • No access to feeds: Make sure you have access to the requested feeds. You can view the list of available feeds in your Portal Personal Account → Security and Access.

404 Not Found

The requested data is not available. Make sure the URL is entered correctly and the requested resource exists.

429 Too Many Requests

The number of requests has exceeded the limit. Reduce the request rate or decrease the number of requests per second in the request limit setting.

500 Internal Server Error

Server-side error. Please wait and try again in a few minutes.

504 Gateway Timeout

The response time limit has been exceeded. Try reducing the API request limit parameter to reduce the server load and avoid this error.

FAQ

Have a question? Ask in the SD Ticket on our Portal

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyf6_utils-0.1.5.tar.gz (15.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyf6_utils-0.1.5-py3-none-any.whl (13.3 kB view details)

Uploaded Python 3

File details

Details for the file pyf6_utils-0.1.5.tar.gz.

File metadata

  • Download URL: pyf6_utils-0.1.5.tar.gz
  • Upload date:
  • Size: 15.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pyf6_utils-0.1.5.tar.gz
Algorithm Hash digest
SHA256 996917ac079dd9349ebc936057bfc3cee77bfbff1c3c3fc577eb5ea3decb248a
MD5 5b1cdba9c2e606451bc9456135df6242
BLAKE2b-256 7e6d9f5d8416458bde7cb0fb3784074521aed70a226d3036bd2ff161e73c7dd4

See more details on using hashes here.

File details

Details for the file pyf6_utils-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: pyf6_utils-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 13.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pyf6_utils-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 c8cf46d447866d7d6c8a5da9620a7566706a43e9b913e642d040115f8ddca817
MD5 bc3da54acf71534bf4bb36933d70b136
BLAKE2b-256 d4c0d71bc43c45c8e811e6c280951ac132a5b2b68a87cbe3d0c868fbf7619e9f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page