Skip to main content

Rosetta is a Python package that can be used to fake security logs and alerts for testing different detection and response use cases.

Project description

snyk codeql slack-community

Rosetta

Rosetta is a Python package that can be used to fake security logs and alerts for testing different detection and response use cases. It provides the following functions:

  • Generate bad and random observables/indicators that include IP Addresses, Urls, File hashes , CVE's and more
  • Fake log messages in different formats like CEF, LEEF and JSON.
  • Convert one log format into another, for example from CEF to LEEF.
  • Send the fake log message to different log management and analytics tools.

Installation

  • You can install rosetta via pip:
pip install rosetta-ce
  • Or you can install it from the source code:
git clone https://github.com/ayman-m/rosetta.git
cd rosetta
python setup.py install
  • Once installed, you can import the library in your Python code like this:
from rosetta import Observables, Events

Usage

Here are some examples of how to use Rosetta:

from rosetta import Converter, ConverterToEnum, ConverterFromEnum, Events, ObservableType, ObservableKnown, \
    Observables, Sender

# Example usage of the Converter class to convert a CEF log into a LEEF log.
converted_log = Converter.convert(from_type=ConverterFromEnum.CEF, to_type=ConverterToEnum.LEEF,
                                  data="cef_log=CEF:0|Security|Intrusion Detection System|1.0|Alert|10|src=192.168.0.1 dst=192.168.0.2 act=blocked")
print(
    converted_log)  # {'message': 'converted', 'data': 'LEEF=1.0!Vendor=Security!Product=Intrusion Detection System!Version=1.0!EventID=Alert!Name=10!src=192.168.0.1!dst=192.168.0.2!act=blocked'}

# Example usage of the Observables class to generate bad IP indicators.
bad_ip = Observables.generator(count=2, observable_type=ObservableType.IP, known=ObservableKnown.BAD)
print(bad_ip)  # ['ip1', 'ip2']

# Example usage of the Observables class to generate good IP indicators.
good_ip = Observables.generator(count=2, observable_type=ObservableType.IP, known=ObservableKnown.GOOD)
print(good_ip)  # ['ip1', 'ip2']

# Example usage of the Observables class to generate bad URL indicators.
bad_url = Observables.generator(count=2, observable_type=ObservableType.URL, known=ObservableKnown.BAD)
print(bad_url)  # ['url1', 'url2']

# Example usage of the Observables class to generate good URL indicators.
good_url = Observables.generator(count=2, observable_type=ObservableType.URL, known=ObservableKnown.GOOD)
print(good_url)  # ['url1', 'url2']

# Example usage of the Observables class to generate bad Hash indicators.
bad_hash = Observables.generator(count=2, observable_type=ObservableType.SHA256, known=ObservableKnown.BAD)
print(bad_hash)  # ['hash1', 'hash2']

# Example usage of the Observables class to generate good Hash indicators.
good_hash = Observables.generator(count=2, observable_type=ObservableType.SHA256, known=ObservableKnown.GOOD)
print(good_hash)  # ['hash1', 'hash2']

# Example usage of the Observables class to generate CVE indicators.
cve = Observables.generator(count=2, observable_type=ObservableType.CVE)
print(cve)  # Example: ['CVE-2023-2136', 'CVE-2023-29582']

# Example usage of the Observables class to generate random Terms.
terms = Observables.generator(count=2, observable_type=ObservableType.TERMS)
print(terms)  # Example: ['Create or Modify System Process', 'Stage Capabilities: Drive-by Target']


# You can create an instance of the Observables class to contain your own observables that are to be used in the fake security events
src_ip, dst_ip, src_host, dst_host = ["192.168.10.10", "192.168.10.20"], ["1.1.1.1", "1.1.1.2"], ["abc"], ["xyz", "wlv"]
url, port = ["https://example.org", "https://wikipedia.com"], ["555", "666"]
protocol, app = ["ftp", "dns", "ssl"], ["explorer.exe", "chrome.exe"]
user = ["ayman", "mahmoud"]
file_name, file_hash = ["test.zip", "image.ps"], ["719283fd5600eb631c23b290530e4dac9029bae72f15299711edbc800e8e02b2"]
cmd, process = ["sudo restart", "systemctl stop firewalld"], ["bind", "crond"]
severity = ["high", "critical"]
sensor = ["fw", "edr"]
action = ["block", "allow"]
observables_list = Observables(src_ip=src_ip, dst_ip=dst_ip, src_host=src_host, dst_host=dst_host, url=url, port=port,
                               protocol=protocol, app=app, user=user, file_name=file_name, file_hash=file_hash, cmd=cmd,
                               process=process, severity=severity, sensor=sensor, action=action)

# Example usage of the Events class to generate generic SYSLOG events.
generic_syslog_with_random_observables = Events.syslog(count=1)
print(generic_syslog_with_random_observables)  # ['Jan 20 16:04:53 db-88.zuniga.net sudo[34675]: ryansandy : COMMAND ; iptables -F']
generic_syslog_with_my_observables = Events.syslog(count=1, observables=observables_list)
print(generic_syslog_with_my_observables)  # ['Apr 07 10:21:43 abc crond[17458]: ayman : COMMAND ; sudo restart']


# Example usage of the Events class to generate CEF events.
generic_cef_with_my_observables = Events.cef(count=1, observables=observables_list)
print(generic_cef_with_my_observables)  # ['CEF:0|Novak LLC|Firewall|1.0.6|3019ab69-2d0e-4b3f-a240-4e8c93042dc3|Firewall allow dns traffic from abc:33504 to 1.1.1.1:666|5|src=abc spt=33504 dst=1.1.1.1 url=https://example.org dpt=666 proto=dns act=allow']


leef_with_my_observables = Events.leef(count=1, observables=observables_list)
print(leef_with_my_observables)  # ["LEEF:1.0|Leef|Payment Portal|1.0|210.12.108.86|abc|9a:1e:9d:00:4c:ba|3b:a0:4b:24:f7:59|src=192.168.10.10 dst=abc spt=61549 dpt=443 request=https://example.com/search.php?q=<script>alert('xss')</script> method=Web-GET proto=HTTP/1.1 status=500 hash=719283fd5600eb631c23b290530e4dac9029bae72f15299711edbc800e8e02b2request_size=6173 response_size=8611 user_agent=Mozilla/5.0 (iPhone; CPU iPhone OS 9_3_5 like Mac OS X) AppleWebKit/536.1 (KHTML, like Gecko) FxiOS/12.1s4879.0 Mobile/00Y135 Safari/536.1"]

winevent_with_my_observables = Events.winevent(count=1, observables=observables_list)
print(winevent_with_my_observables)  # ['<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Microsoft-Windows-Security-Auditing" Guid="5fc4a88c-97b0-4061-adc3-052159c10ef4"/><EventID>4648</EventID><Version>0</Version><Level>0</Level><Task>13824</Task><Opcode>0</Opcode><Keywords>0x8020000000000000</Keywords><TimeCreated SystemTime="2023-04-07T18:45:17"/><EventRecordID>575</EventRecordID><Correlation/><Execution ProcessID="1071" ThreadID="5317" Channel="Security"/><Computer>abc</Computer><Security UserID="S-1-2915"/><EventData><Data Name="SubjectUserSid">S-1-2915</Data><Data Name="SubjectUserName">mahmoud</Data><Data Name="SubjectDomainName">johnson.org</Data><Data Name="SubjectLogonId">S-1-2915</Data><Data Name="NewProcessId">3371</Data><Data Name="ProcessId">1071</Data><Data Name="CommandLine">sudo restart</Data><Data Name="TargetUserSid">S-1-2915</Data><Data Name="TargetUserName">mahmoud</Data><Data Name="TargetDomainName">johnson.org</Data><Data Name="TargetLogonId">S-1-2915</Data><Data Name="LogonType">3</Data></EventData></Event>']

json_with_my_observables = Events.json(count=1, observables=observables_list)
print(json_with_my_observables) # [{'event_type': 'vulnerability_discovered', 'timestamp': '2023-02-12T16:28:46', 'severity': 'high', 'host': 'abc', 'file_hash': '719283fd5600eb631c23b290530e4dac9029bae72f15299711edbc800e8e02b2', 'cve': ['CVE-3941-1955']}]

incident_with_my_observables = Events.incidents(count=1, fields="id,type,duration,analyst,description,events", observables=observables_list)
print(incident_with_my_observables) # [{'id': 1, 'duration': 2, 'type': 'Lateral Movement', 'analyst': 'Elizabeth', 'description': 'Software Discovery Forge Web Credentials: SAML Tokens Escape to Host System Binary Proxy Execution: Control Panel Hide Artifacts: Process Argument Spoofing Office Application Startup: Add-ins Compromise Infrastructure: Botnet.', 'events': [{'event': 'Apr 09 19:39:57 abc bind[56294]: ayman : COMMAND ; systemctl stop firewalld'}, {'event': 'CEF:0|Todd, Guzman and Morales|Firewall|1.0.4|afe3d30f-cff4-4084-a7a3-7de9ea21d0e9|Firewall block dns traffic from abc:26806 to 1.1.1.1:555|10|src=abc spt=26806 dst=1.1.1.1 url=https://example.org dpt=555 proto=dns act=block'}, {'event': 'LEEF:1.0|Leef|Payment Portal|1.0|19.90.247.108|abc|d4:27:4c:a7:40:50|2a:3f:f3:37:81:eb|src=192.168.10.20 dst=abc spt=47335 dpt=443 request=https://example.com/index.php method=Web-GET proto=HTTP/1.1 status=500 hash=719283fd5600eb631c23b290530e4dac9029bae72f15299711edbc800e8e02b2request_size=3640 response_size=4766 user_agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_1) AppleWebKit/533.0 (KHTML, like Gecko) Chrome/47.0.819.0 Safari/533.0'}, {'event': '<Event xmlns="http://schemas.microsoft.com/win/2004/08/events/event"><System><Provider Name="Microsoft-Windows-Security-Auditing" Guid="67eb0bb0-ab24-43ce-b7f1-6d6a6bb0ac27"/><EventID>4672</EventID><Version>0</Version><Level>0</Level><Task>12544</Task><Opcode>0</Opcode><Keywords>0x8020000000000000</Keywords><TimeCreated SystemTime="2023-01-15T04:07:58"/><EventRecordID>38</EventRecordID><Correlation/><Execution ProcessID="7182" ThreadID="7703" Channel="Security"/><Computer>abc</Computer><Security UserID="S-1-7181"/><EventData><Data Name="SubjectUserSid">S-1-7181</Data><Data Name="SubjectUserName">mahmoud</Data><Data Name="SubjectDomainName">johnson.net</Data><Data Name="SubjectLogonId">9638</Data><Data Name="PrivilegeList">Through moment tonight.</Data></EventData></Event>'}, {'event': {'event_type': 'vulnerability_discovered', 'timestamp': '2023-01-18T23:49:45', 'severity': 'critical', 'host': 'abc', 'file_hash': '719283fd5600eb631c23b290530e4dac9029bae72f15299711edbc800e8e02b2', 'cve': ['CVE-2023-29067']}}]}]

# Example usage of the Sender class to send faked events to log analysis tool.
worker = Sender(data_type="SYSLOG", destination="udp:127.0.0.1:514", observables=observables_list, count=5, interval=2)
worker.start()

# Starting worker: worker_2023-04-26 17:50:15.671101
# Worker: worker_2023-04-26 17:50:15.671101 sending log message to 127.0.0.1 
# Worker: worker_2023-04-26 17:50:15.671101 sending log message to 127.0.0.1 
# Worker: worker_2023-04-26 17:50:15.671101 sending log message to 127.0.0.1 
# Worker: worker_2023-04-26 17:50:15.671101 sending log message to 127.0.0.1 
# Worker: worker_2023-04-26 17:50:15.671101 sending log message to 127.0.0.1 

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rosetta-ce-1.5.3.tar.gz (23.0 kB view details)

Uploaded Source

Built Distributions

rosetta_ce-1.5.3-py3.11.egg (48.4 kB view details)

Uploaded Source

rosetta_ce-1.5.3-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file rosetta-ce-1.5.3.tar.gz.

File metadata

  • Download URL: rosetta-ce-1.5.3.tar.gz
  • Upload date:
  • Size: 23.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.2

File hashes

Hashes for rosetta-ce-1.5.3.tar.gz
Algorithm Hash digest
SHA256 9c03b338c1b654415d93887b0f3616feb0594549aac75cbb48647c12b811b16d
MD5 e576864226d6d2637f1f583f92fd300a
BLAKE2b-256 e8eb84dd069d4c5e4140b884b2a18fe82151d940d4b3115a20145aec43517ce1

See more details on using hashes here.

File details

Details for the file rosetta_ce-1.5.3-py3.11.egg.

File metadata

  • Download URL: rosetta_ce-1.5.3-py3.11.egg
  • Upload date:
  • Size: 48.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.2

File hashes

Hashes for rosetta_ce-1.5.3-py3.11.egg
Algorithm Hash digest
SHA256 30ee4d96a49b1afbd6baef6340c1206d2bc5bcd20136bf11960118c953131d2d
MD5 3f7b760316ae889d8ccf5af1c19b9954
BLAKE2b-256 8a4ac7850dabfd52012a6da2b26e119252b914355bad401fea12d53ab291bf53

See more details on using hashes here.

File details

Details for the file rosetta_ce-1.5.3-py3-none-any.whl.

File metadata

  • Download URL: rosetta_ce-1.5.3-py3-none-any.whl
  • Upload date:
  • Size: 20.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.2

File hashes

Hashes for rosetta_ce-1.5.3-py3-none-any.whl
Algorithm Hash digest
SHA256 da672da7852d6b793f92b209044c245190f9494546729ef9f9dcab4b7414f1f5
MD5 905716fe3e3b6e4aba9907768daf3f85
BLAKE2b-256 792cd579c901b1dbd520e55768383cd141520752563db2d9c11a8bf2cd8e472a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page