A set of class tools to run the Funguana Pipeline. Many irrelelavent parts
Project description
Fungua Universe
This is a set of libraries to allow for the use of online live streaming systems within machine learning. They're largely separate from the services they're meant to support.
Installation
To install this code on your system you need to ensure you have pip
installed on your system.
Run The Command
pip install funguauniverse
I'll need to heavily document this code. I'll go through a quick overview of what's included. The RL blog I crafted here, explains some of the uses of it.
For example, see how the background daemon works.
import threading
from crayons import yellow
from funguauniverse import PNode
from stochastic_agent.backtester.components.events import CheckNext
from stochastic_agent.backtester.components.parallel.policy import log_reward
from stochastic_agent.backtester.components.parallel.technical import reward_calculation
class RewardWorker(PNode):
def __init__(self, event_queue, *args, **kwargs):
super().__init__(event_queue, *args, **kwargs)
# This is watching all futures to check if they're done
self.future_list = []
self.future_watcher = threading.Thread(target=self.signal_watch, daemon=True).start()
def process(self):
event = self.get_action_from_queue()
if event is None:
return
episode = event.__dict__["episode"]
# print(yellow(episode))
# Get from episode data
# Check for the changes using the exact same ideas from before.
future = reward_calculation(episode, {})
self.future_list.append(future)
def create_next_event(self, episode=None):
return CheckNext(episode=episode)
def act_on_results(self, reward_obs):
episode = reward_obs['episode']
reward = reward_obs['reward']
print(yellow(f"Reward: {reward}"))
log_reward(episode, reward)
# Do something about the action here
# Create a new event
next_event = self.create_next_event(episode)
self.event_queue.put(next_event)
def signal_watch(self):
while True:
# print(self.future_list)
for index, future in enumerate(self.future_list):
if future.done():
final = self.future_list.pop(index)
# print(final)
self.act_on_results(final.result())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
funguauniverse-0.4.3.tar.gz
(14.1 kB
view hashes)
Built Distribution
Close
Hashes for funguauniverse-0.4.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 680785018404848b439199b5a33a86200767cd0332b873421a05cb1261f073c5 |
|
MD5 | cc61117dade9a873ee9647c7e839c710 |
|
BLAKE2b-256 | 5a01c5f9f9e6ad2826eec6805cf38350a6413b9aca2ce21aca445dac16275f9b |