This is the basic library for xkit.
Project description
This is the basic library for embedded project.
History
- V0.6.1
- modify SLIP decode
- append HDLC encode/decode
- V0.5.0
- Improved UDPServer local address allocation method
- V0.3.0
- wrapper SLIP
- Supports UDP and multicast
- Supports ANSI Escape Codes
Install
pip install genlib
ANSI Escape Codes
- Foreground color settings
- ANSIEC.FG.<BLACK | RED | GREEN | YELLOW | BLUE | MAGENTA | CYAN | WHITE>
- ANSIEC.FG.<BRIGHT_BLACK | BRIGHT_RED | BRIGHT_GREEN | BRIGHT_YELLOW | BRIGHT_BLUE | BRIGHT_MAGENTA | BRIGHT_CYAN | BRIGHT_WHITE>
- ANSIEC.FG.rgb(r, g, b)
- Background color settings
- ANSIEC.BG.<BLACK | RED | GREEN | YELLOW | BLUE | MAGENTA | CYAN | WHITE>
- ANSIEC.BG.<BRIGHT_BLACK | BRIGHT_RED | BRIGHT_GREEN | BRIGHT_YELLOW | BRIGHT_BLUE | BRIGHT_MAGENTA | BRIGHT_CYAN | BRIGHT_WHITE>
- ANSIEC.BG.rgb(r, g, b)
- Styles
- ANSIEC.OP.<RESET | BOLD | UNDER_LINE | REVERSE>
- Screen
- ANSIEC.OP.<CLEAR | CLEAR_LINE>
- Cursor Navigation
- ANSIEC.OP.<TOP>
- ANSIEC.OP.<up(n) | down(n) | right(n) | left(n) | next_line(n) | prev_line(n) | to(row, column)>
from genlib.ansiec import ANSIEC
import time, sys, random
def loading(count):
all_progress = [0] * count
sys.stdout.write("\n" * count)
colors = [0] * count
for i in range(count):
colors[i] = ANSIEC.FG.rgb(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
while any(x < 100 for x in all_progress):
time.sleep(0.01)
unfinished = [(i, v) for (i, v) in enumerate(all_progress) if v < 100]
index, _ = random.choice(unfinished)
all_progress[index] += 1
sys.stdout.write(ANSIEC.OP.left(27))
sys.stdout.write(ANSIEC.OP.up(count))
for i, progress in enumerate(all_progress):
sys.stdout.write(colors[i])
width = progress // 4
print(f"[{'#'* width}{' ' * (25 - width)}] {ANSIEC.OP.RESET}{width*4}%")
sys.stdout.write(ANSIEC.OP.RESET)
def main():
loading(4)
if __name__ == "__main__":
main()
Data Boundary
SLIP encode/decode
from genlib import slip
def test_encode_decode(slip):
data = "0.9992676 0.02764893 0.02612305 0.0"
packet_send = slip.encode(data)
print(packet_send)
packet_recv = slip.decode(packet_send)[0]
print(packet_recv, packet_recv == data)
def test_stream_decode(slip):
data = []
data.append(b'\xc00.9992676 0.02764893 0.02612305 0.0\xc0')
data.append(b'\xc00.99')
data.append(b'92676 0.02764893 0.02612305 0.0\xc0')
data.append(b'\xc00.9992676 ')
data.append(b'0.02764893 0.02612305 0.0\xc0')
data.append(b'\xc00.9992676 0.')
data.append(b'02764893 0.02612305 0.0\xc0')
data.append(b'\xc00.9992676 0.02764893 0.0')
data.append(b'2612305 0.0\xc0')
data.append(b'\xc00.9992676 0.02764893 0.02612305 0.0\xc0')
for d in data:
for packet in slip.decode(d):
print(packet)
if __name__ == "__main__":
test_encode_decode()
test_stream_decode()
HDLC encode/decode
import json
from genlib import hdlc
messages = ("Hello World", [10, 20], [3.14], [1000])
message_1 = json.dumps(messages[0]).encode()
message_2 = json.dumps(messages[1]).encode()
message_3 = json.dumps(messages[2]).encode()
message_4 = json.dumps(messages[3]).encode()
send_frame_1 = hdlc.encode(message_1)
send_frame_2 = hdlc.encode(message_2)
send_frame_3 = hdlc.encode(message_3)
send_frame_4 = hdlc.encode(message_4)
recv_frame_1 = send_frame_1[3:]
recv_frame_2 = send_frame_2 + send_frame_3[:2]
recv_frame_3 = send_frame_3[2:] + send_frame_4
recv_message_1 = hdlc.decode(recv_frame_1)
recv_message_2 = hdlc.decode(recv_frame_2)
recv_message_3 = hdlc.decode(recv_frame_3)
if not recv_message_1:
print("fail")
else:
for msg in recv_message_1:
print(json.loads(msg))
if not recv_message_2:
print("fail")
else:
for msg in recv_message_2:
print(json.loads(msg))
if not recv_message_3:
print("fail")
else:
for msg in recv_message_3:
print(json.loads(msg))
Multicast
Multicast recv
import copy
from genlib.ansiec import ANSIEC
from genlib.udp import MulticastReceiver
MAX_RECV_LINE = 15
recv_data_list = []
def on_async_chatting_recv(receiver, message):
recv_data_list.append(copy.deepcopy(message))
if len(recv_data_list) > MAX_RECV_LINE:
recv_data_list.pop(0)
for i, data in enumerate(recv_data_list[: min(len(recv_data_list), MAX_RECV_LINE)]):
print(ANSIEC.OP.to(i+1,1) + ANSIEC.OP.CLEAR_LINE, end='')
print(ANSIEC.OP.to(i+1,1) + ANSIEC.FG.BRIGHT_YELLOW + f">>> RECEIVE {data.remote}: {data.payload}")
print(ANSIEC.OP.to(MAX_RECV_LINE, 1) + ANSIEC.FG.BRIGHT_BLUE)
def main():
receiver = MulticastReceiver()
print(ANSIEC.OP.CLEAR, end='')
receiver.onRecv(on_async_chatting_recv, unpickling=True)
receiver.loopStart()
while True:
try:
data = input(f"{ANSIEC.OP.to(MAX_RECV_LINE+1, 1) + ANSIEC.FG.BRIGHT_BLUE + ANSIEC.OP.CLEAR_LINE}{receiver.remote}: ")
if data:
receiver.sendTo(data) #Multiple unicast
except KeyboardInterrupt:
break
receiver.loopStop()
if __name__ == "__main__":
main()
Multicast send
import copy
from genlib.ansiec import ANSIEC
from genlib.udp import MulticastSender
MAX_RECV_LINE = 15
recv_data_list = []
def on_async_chatting_recv(sender, message): #Multiple Unicast
recv_data_list.append(copy.deepcopy(message))
if len(recv_data_list) > MAX_RECV_LINE:
recv_data_list.pop(0)
for i, data in enumerate(recv_data_list[: min(len(recv_data_list), MAX_RECV_LINE)]):
print(ANSIEC.OP.to(i+1,1) + ANSIEC.OP.CLEAR_LINE, end='')
print(ANSIEC.OP.to(i+1,1) + ANSIEC.FG.BRIGHT_RED + f">>> RECEIVE {data.remote}: {data.payload}")
print(ANSIEC.OP.to(MAX_RECV_LINE, 1) + ANSIEC.FG.BRIGHT_BLUE)
def main():
sender = MulticastSender()
print(ANSIEC.OP.CLEAR, end='')
sender.onRecv(on_async_chatting_recv, unpickling=True)
sender.loopStart()
print(ANSIEC.OP.CLEAR, end='')
while True:
try:
data = input(f"{ANSIEC.OP.to(MAX_RECV_LINE+1, 1) + ANSIEC.FG.BRIGHT_BLUE + ANSIEC.OP.CLEAR_LINE}{sender.group}: ")
if data:
sender.sendTo(data) #Only Multicast
except KeyboardInterrupt:
break
sender.loopStop()
if __name__ == "__main__":
main()
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
genlib-0.6.1.tar.gz
(11.8 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
genlib-0.6.1-py3-none-any.whl
(10.4 kB
view details)
File details
Details for the file genlib-0.6.1.tar.gz.
File metadata
- Download URL: genlib-0.6.1.tar.gz
- Upload date:
- Size: 11.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
52801cbcea1df62566edc354ad4640ba2662b0941a496c4d07dc43556ec4d891
|
|
| MD5 |
f668f5aae8c8f5a00fed0aafe4fe2644
|
|
| BLAKE2b-256 |
633a9805da7fed185bfd39a7bda6f63e29b4376a7472389a0e15bdc8528b3a25
|
File details
Details for the file genlib-0.6.1-py3-none-any.whl.
File metadata
- Download URL: genlib-0.6.1-py3-none-any.whl
- Upload date:
- Size: 10.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
03499b9666df82f55e3f92b8e907ef2e671bef5d56a387782142c91558134506
|
|
| MD5 |
dc41e3d6aa8c2ec362a5f7a6346d496c
|
|
| BLAKE2b-256 |
f4dc2918a93574f2831a07363b87d8b24898fb734f03172b4c6a9226e1218794
|