Library for Polyswarm Microengine Utility Package
Project description
microengine-utils
Utility package for PolySwarm Engines
Supports Python 3.6 and greater.
Installation
From PyPI:
pip install microengine-utils
From source:
python3 setup.py install
OR
pip3 install .
If you get an error about a missing package named
wheel
, that means your version of pip or setuptools is too old. You need pip >= 19.0 and setuptools >= 40.8.0. To update pip, runpip install -U pip
. To update setuptools, runpip install -U setuptools
Usage
Here is an example for how to use the datadog
metrics utility in an Engine.
import asgiref.sync as asgiref_sync
import logging
import os
import platform
import polyswarm_myengine
from polyswarmartifact.schema.verdict import Verdict
from polyswarmclient.abstractscanner import AbstractScanner, ScanResult
from microengine_utils.constants import SCAN_VERDICT, SCAN_FAIL, SCAN_TIME
from microengine_utils.datadog import configure_metrics
logger = logging.getLogger(__name__)
DATADOG_API_KEY = 'my_api_key'
DATADOG_APP_KEY = 'my_app_key'
# Configure Datadog metric keys for use in the application
ENGINE_NAME = 'myengine'
SCANNER_TYPE = 'file'
OS_TYPE = 'windows'
# Set the environment name, "local" is used for testing
POLY_WORK = os.getenv('POLY_WORK', 'local')
# Set the hostname, "local" is used for testing
SOURCE = os.getenv("HOSTNAME", "localhost")
class Scanner(AbstractScanner):
def __init__(self):
self.datadog_api_key = os.getenv('DATADOG_API_KEY', None)
self.datadog_app_key = os.getenv('DATADOG_APP_KEY', None)
self.metrics_collector = configure_metrics(self.datadog_api_key,
self.datadog_app_key,
ENGINE_NAME,
OS_TYPE,
POLY_WORK,
SOURCE)
async def scan(self, guid, artifact_type, content, metadata, chain):
version = await Scanner._get_my_engine_version()
metadata = Verdict().set_malware_family('')\
.set_scanner(operating_system=platform.system(),
architecture=platform.machine(),
vendor_version=version,
version=polyswarm_myengine.__version__)
artifact_name = await asgiref_sync.sync_to_async(self._create_temp_file)(content)
with self.metrics_collector.timer(SCAN_TIME):
try:
exit_code, scan_output = await Scanner._run_system_cmd(Scanner._get_full_command(artifact_name))
logger.info("myengine scan result: %s", scan_output)
finally:
await asgiref_sync.sync_to_async(os.unlink)(artifact_name)
if exit_code != 0:
self.metrics_collector.increment(SCAN_FAIL)
return ScanResult(metadata=metadata.json())
infected_bool, malware_family = Scanner._process_output(scan_output)
metadata.set_malware_family(malware_family)
confidence = 0.8
if infected_bool:
self.metrics_collector.increment(SCAN_VERDICT,
tags=['verdict:malicious',
f'malware_family:{metadata.malware_family}',
'type:file'])
else:
self.metrics_collector.increment(SCAN_VERDICT, tags=['verdict:benign', 'type:file'])
return ScanResult(bit=True, verdict=infected_bool, confidence=confidence, metadata=metadata.json())
Here is an example for using the malwarerepoclient
utility in Engine unit tests
import asyncio
import pytest
import sys
from microengine_utils.malwarerepoclient import DummyMalwareRepoClient
from polyswarm_myengine import Scanner
from polyswarmartifact import ArtifactType
@pytest.yield_fixture()
def event_loop():
loop = asyncio.get_event_loop()
if sys.platform == 'win32':
loop = asyncio.ProactorEventLoop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_scan_random_malicious_and_not():
scanner = Scanner()
for t in [True, False]:
mal_md, mal_content = DummyMalwareRepoClient().get_random_file(malicious_filter=t)
result = await scanner.scan("nocare", ArtifactType.FILE, mal_content, None, "home")
assert result.verdict == t
Testing
git clone https://github.com/polyswarm/microengine-utils.git
cd microengine-utils
pip3 install -r requirements.txt
pip3 install .
pytest -s -v
Questions? Problems?
File a ticket or email us at info@polyswarm.io
.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Close
Hashes for microengine_utils-1.2.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | dbad18b676b19d5b2f7255aa3f216b835692703875efa8bd9b6523162b4b2334 |
|
MD5 | 9144ab1e847889f4fd03bb4c49829f94 |
|
BLAKE2b-256 | b46f76d48f59c688166e3880a4a10e57eca7a6e14c46434233e7ccdcf40a0189 |