Data Online Generator
Project description
Data Online Generator (doge_datagen)
Description
DataOnlineGenerator can be used to simulate user, system or other actor behaviour bases on probabilistic model. It is a state machine which is traversed by multiple subjects automatically based on probability defined for each possible transition.
Each Subject instance can hold additional attributes that can be modified during transition with use of action_callback. Action callback can also be used to make transition fail and to remain in current state.
In transition definition event_sinks can be passed which will be called on successful transition to log, generate event or do other actions.
State machine works in ticks which length and number is defined in constructor. In each tick DataOnlineGenerate evaluates each subject and makes a transition based on provided probabilities. Sum of probabilities of doing a transition from given state have to be less or equal to 100. If probabilities of all transitions are less than 100 remaining value is treated as probability of staying in the same state in given tick.
Installation
Package is available in PyPI https://pypi.org/project/doge-datagen/
pip install -U doge-datagen
Usage
DataOnlineGenerator
Please refer to examples for full reference.
Let's consider a user of a banking application that might take or consider taking a loan. Such user will have some account and loan balance. Will be able to receive an income and spend his money. He will be also able to open his banking application, open a loan screen and possible take a loan or exit application.
We can model this behaviour as states and transitions:
And define it as code:
datagen = DataOnlineGenerator(['offline', 'online', 'loan_screen'], 'offline', UserFactory(), 10, 60000, 1000)
datagen.add_transition('income', 'offline', 'offline', 0.01,
action_callback=income_callback, event_sinks=[balance_sink])
datagen.add_transition('spending', 'offline', 'offline', 0.1,
action_callback=spending_callback, event_sinks=[trx_sink, balance_sink])
datagen.add_transition('login', 'offline', 'online', 0.1, event_sinks=[clickstream_sink])
datagen.add_transition('logout', 'online', 'offline', 70, event_sinks=[])
datagen.add_transition('open_loan_screen ', 'online', 'loan_screen', 30, event_sinks=[clickstream_sink])
datagen.add_transition('close_loan_screen', 'loan_screen', 'online', 40, event_sinks=[clickstream_sink])
datagen.add_transition('take_loan', 'loan_screen', 'online', 10,
action_callback=take_loan_callback, event_sinks=[clickstream_sink, loan_sink, balance_sink])
Except defining state machine, we also provide factory that will be called to generate Subjects, in above example called Users. We also provide tick length (1 min) and number of ticks (1000).
Probability of login into application 0.1 [%] can be interpreted as on average 1/1000 of all users in each minute will log in or that specific User will log in to an app on average once in 1000 minutes (a little more than once a day).
Sink factories
Printing sink
Simple sink that can be used to print transition results on screen. It requires a format function that converts transition details into a string.
def format_function(timestamp: int, user: Subject, transition: Transition) -> str:
return '[{}] User id: {}, balance: {}, loan_balance: {} made a transition {} from {} to {}'\
.format(timestamp,
user.user_id,
user.balance,
user.loan_balance,
transition.trigger,
transition.from_state,
transition.to_state)
sink = PrintingSink(format_function)
Kafka sink
Kafka sink allows emitting events into Kafka topics. By default, it uses String Serializers which can be overridden by
providing different serializers to create
method. It requires key_function
and value_function
to be provided
which converts transition details into a format that is serializable by provided serializers.
def key_function(subject: Subject, transition: Transition) -> str:
return str(subject.user_id)
def value_function(timestamp: int, subject: Subject, transition: Transition) -> str:
value = {
'timestamp': timestamp,
'user': {
'user_id': subject.user_id,
'balance': subject.balance,
'loan_balance': subject.loan_balance
},
'event': transition.trigger
}
return json.dumps(value)
factory = KafkaSinkFactory(['localhost:9092'], 'doge-kafka-example')
sink = factory.create('test_topic', key_function, value_function)
Kafka Avro sink
KafkaAvroSinkFactory
is a bit of convenience factory that wraps around regular KafkaSinkFactory
that hides away creation
details of classes needed to push Avro events into Kafka. It requires functions that convert transition details into a
format suitable for AvroSerializer
(typically it will be a structure of nested dicts, with sometimes tuples or type
hints in case of type unions in schema.
Fast avro documentation)
and avro schemas for key and value to be provided.
def key_function(subject: Subject, transition: Transition) -> Dict[str, Any]:
return {'key': str(subject.user_id)}
def value_function(timestamp: int, subject: Subject, transition: Transition) -> Dict[str, Any]:
value = {
'timestamp': timestamp,
'user': {
'userId': str(subject.user_id),
'balance': str(subject.balance),
'loanBalance': str(subject.loan_balance)
},
'event': transition.trigger
}
return value
def get_schema(schema_path):
with open(schema_path) as f:
return f.read()
key_schema = get_schema('./avro/Key.avsc')
event_schema = get_schema('./avro/Event.avsc')
factory = KafkaAvroSinkFactory(['localhost:9092'], 'http://localhost:8081', 'doge-kafka-example')
sink = factory.create('test_avro_topic', key_function, key_schema, value_function, event_schema)
DB sink
DbSinkFactory
uses SQLAlchemy core. It requires DB Url in acceptable format and a function that is able to convert
transition details into a flat dict of values.
def row_mapper_function(timestamp: int, subject: Subject, transition: Transition) -> Dict[str, Any]:
row = {
'timestamp': timestamp,
'user_id': subject.user_id,
'balance': subject.balance,
'loan_balance': subject.loan_balance,
'event': transition.trigger
}
return row
factory = DbSinkFactory('postgresql://postgres:postgres@localhost:5432/postgres')
sink = factory.create('events', row_mapper_function)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file doge_datagen-1.0.4.tar.gz
.
File metadata
- Download URL: doge_datagen-1.0.4.tar.gz
- Upload date:
- Size: 19.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.9.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a3d49f73a6655422549d21661413a938f5c39ee02d356a1ea29b8179900894f5 |
|
MD5 | 47733ebc0cff9f5d97cbb55147a4cd53 |
|
BLAKE2b-256 | 365eee2502f562e0abab2abd9e5f32478bdf628c49b2e66ad319a4db2674692e |