A scripting language to simply manage a very large amount of i/o heavy workloads. Such as API calls for your ETL, ELT or any program needing Python and/or SQL
Project description
Buelon
A scripting language to simply manage a very large amount of i/o heavy workloads. Such as API calls for your ETL, ELT or any program needing Python and/or SQL
Table of Contents
Installation
pip install buelon
That's it!
This will install the cli command bue
. Check install by running bue --version
or bue -v
Note:
This package uses Cython and you may need to install python3-dev
using
sudo apt-get install python3-dev
[more commands and information].
If you would like to use this repository without Cython,
you may git clone
since it is not technically dependent on
these scripts, but they do provide a significant performance boost.
Quick Start
- Get example template:
bue example
(warning: this command will over-write.env
) - Start Bucket server, Hub and 3 workers:
bue demo
- Upload script and wait for results:
python3 example.py
Production Start
Security: Make sure bucket, hub and workers are under
a private network only
(you will need a web server or something similar
under the same private network
to access this tool using bue upload
)
- Run bucket server:
bue bucket -b 0.0.0.0:61535
- Run hub:
bue hub -b 0.0.0.0:65432 -k localhost:61535
- Run worker(s):
bue worker -b localhost:65432 -k localhost:61535
- Upload code:
bue upload -b localhost:65432 -f ./example.bue
Supported Languages
- Python
- SQLite3
- PostgreSQL
Learn by Example
(see below for example.py
contents)
# IMPORTANT: tabs are 4 spaces. white_space == " "
# setting scopes is how you make new jobs with errors
# not slow down your servers by setting them to a lower scope.
# And/or how you handle running heavy processes on large machine
# and small process on small machines
$ production-small
!0
# define a job called `accounts`
accounts:
python # <-- select the language to be run. currently only python, sqlite3 and postgres are currently available
accounts # select the function(for python) or table(for sql) name that will be used
example.py # either provide a file or write code directly using the "`" char (see below example)
request:
python
request_report
example.py
status:
python
$ testing-small # <-- "scope" for a single step. A lower scope will be given less priority over higher scopes. See PIPE_WORKER_SCOPES in `.env` file generated by `bue example`
get_status
example.py
download:
python
!9 # <-- "priority" higher numbers are more important and run first within their scope.
get_report
example.py
manipulate_data:
sqlite3
some_table # *vvvv* see below for writing code directly *vvvv*
`
SELECT
*,
CASE
WHEN sales = 0
THEN 0.0
ELSE spend / sales
END AS acos
FROM some_table
`
## this one's just to show postgres as well
#manipulate_data_again:
# postgres
# another_table
# `
#select
# *,
# case
# when spend = 0
# then 0.0
# else sales / spend
# end AS roas
#from another_table
#`
py_transform:
python
$ testing-heavy
transform_data
example.py
upload:
python
upload_to_db
example.py
# these are pipes and what will tell the server what order to run the steps
# and also transfer the returned data between steps
# each step will be run individually and could be run on a different computer each time
accounts_pipe = | accounts # single pipes currently need a `|` before or behind the value
api_pipe = request | status | download | manipulate_data | py_transform | upload
# currently there are only two syntax's for "running" pipes.
# either by itself:
# pipe()
#
# or in a loop:
# for value in pipe1():
# pipe2(value)
# # Another Example:
# v = pipe() # <-- single call
# pipe2(v)
# right not you cannot pass arguments within the pipe being used for the for loop.
# in this case `accounts_pipe()` cannot be `accounts_pipe(some_value)`
for account in accounts_pipe():
api_pipe(account)
example.py
import time
import random
import uuid
import logging
from typing import List, Dict, Union
from buelon.core.step import Result, StepStatus
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def accounts(*args) -> List[Dict[str, Union[int, str]]]:
"""Returns a list of sample account dictionaries.
Returns:
List[Dict[str, Union[int, str]]]: A list of dictionaries containing account information.
"""
account_list = [
{'id': 0, 'name': 'Account 1'},
{'id': 2, 'name': 'Account 2'},
{'id': 3, 'name': 'Account 4'},
]
logger.info(f"Retrieved {len(account_list)} accounts")
return account_list
def request_report(config: Dict[str, Union[int, str]]) -> Dict[str, Union[Dict, uuid.UUID, float]]:
"""Simulates a report request for a given account.
Args:
config (Dict[str, Union[int, str]]): A dictionary containing account information.
Returns:
Dict[str, Union[Dict, uuid.UUID, float]]: A dictionary with account data and request details.
"""
account_id = config['id']
request = {
'report_id': uuid.uuid4(),
'time': time.time(),
'account_id': account_id
}
logger.info(f"Requested report for account ID: {account_id}, Report ID: {request['report_id']}")
return {
'account': config,
'request': request
}
def get_status(config: Dict[str, Union[Dict, uuid.UUID, float]]) -> Union[Dict, Result]:
"""Checks the status of a report request.
Args:
config (Dict[str, Union[Dict, uuid.UUID, float]]): A dictionary containing request information.
Returns:
Union[Dict, Result]: Either the input config if successful, or a Result object if pending.
"""
requested_time = config['request']['time']
account_id = config['account']['id']
status = 'success' if requested_time + random.randint(10, 15) < time.time() else 'pending'
if status == 'pending':
logger.info(f"Report status for account ID {account_id} is pending")
return Result(status=StepStatus.pending)
logger.info(f"Report status for account ID {account_id} is success")
return config
def get_report(config: Dict[str, Union[Dict, uuid.UUID, float]]) -> Union[Dict, Result]:
"""Retrieves a report or simulates an error.
Args:
config (Dict[str, Union[Dict, uuid.UUID, float]]): A dictionary containing request configuration.
Returns:
Union[Dict, Result]: Either a dictionary with report data or a Result object for reset.
Raises:
ValueError: If an unexpected error occurs.
"""
account_id = config['account']['id']
if random.randint(0, 10) == 0:
report_data = {'status': 'error', 'msg': 'timeout error'}
else:
report_data = [
{'sales': i * 10, 'spend': i % 10, 'clicks': i * 13}
for i in range(random.randint(25, 100))
]
if not isinstance(report_data, list):
if isinstance(report_data, dict):
if (report_data.get('status') == 'error'
and report_data.get('msg') == 'timeout error'):
logger.warning(f"Timeout error for account ID {account_id}. Resetting.")
return Result(status=StepStatus.reset)
error_msg = f'Unexpected error: {report_data}'
logger.error(f"Error getting report for account ID {account_id}: {error_msg}")
raise ValueError(error_msg)
logger.info(f"Successfully retrieved report for account ID {account_id} with {len(report_data)} rows")
return {
'config': config,
'table_data': report_data
}
def transform_data(data: Dict[str, Union[Dict, List[Dict]]]) -> None:
"""Transforms the report data by adding account information to each row.
Args:
data (Dict[str, Union[Dict, List[Dict]]]): A dictionary containing config and table data.
"""
config = data['config']
table_data = data['table_data']
account_name = config['account']['name']
for row in table_data:
row['account'] = account_name
logger.info(f"Transformed {len(table_data)} rows of data for account: {account_name}")
def upload_to_db(data: Dict[str, Union[Dict, List[Dict]]]) -> None:
"""Handles table upload to database.
Args:
data (Dict[str, Union[Dict, List[Dict]]]): A dictionary containing table data to be uploaded.
"""
table_data = data['table_data']
account_name = data['config']['account']['name']
# Implementation for database upload
logger.info(f"Uploaded {len(table_data)} rows to the database for account: {account_name}")
Known Defects
Currently the error handling for this scripting language is not the best. When the script is run it is build into python, so it then uses its error handling, which is very good. Because of the language's current simplicity, this is not marked as a high priority.
Future Plans
If this projects sees some love, or I just find more free time, I'd like to support more languages. Even compiled languages such as rust
, go
and c++
. Allowing teams that write different languages to work on the same program.
Better bue script errors handling.
Possibly build in rust
once more mature for better performance.
License
- MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for buelon-1.0.65a32-cp311-cp311-macosx_10_9_universal2.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9f5e897e6980b63b0cb99408d8ddd8eab31beacc27b90597be42d7f73d76f9b3 |
|
MD5 | badfea67d2b3d4b3a0ab96d053eafd1e |
|
BLAKE2b-256 | 95d652b5a24d438784ef7ee2ac45b1b7aaa107fcea1e44ccaa549be4d749f553 |