Skip to main content

This is the basic library for xkit.

Project description

This is the basic library for embedded project.

History

  • V0.6.1
    • modify SLIP decode
    • append HDLC encode/decode
  • V0.5.0
    • Improved UDPServer local address allocation method
  • V0.3.0
    • wrapper SLIP
    • Supports UDP and multicast
    • Supports ANSI Escape Codes

Install

pip install genlib

ANSI Escape Codes

  • Foreground color settings
    • ANSIEC.FG.<BLACK | RED | GREEN | YELLOW | BLUE | MAGENTA | CYAN | WHITE>
    • ANSIEC.FG.<BRIGHT_BLACK | BRIGHT_RED | BRIGHT_GREEN | BRIGHT_YELLOW | BRIGHT_BLUE | BRIGHT_MAGENTA | BRIGHT_CYAN | BRIGHT_WHITE>
    • ANSIEC.FG.rgb(r, g, b)
  • Background color settings
    • ANSIEC.BG.<BLACK | RED | GREEN | YELLOW | BLUE | MAGENTA | CYAN | WHITE>
    • ANSIEC.BG.<BRIGHT_BLACK | BRIGHT_RED | BRIGHT_GREEN | BRIGHT_YELLOW | BRIGHT_BLUE | BRIGHT_MAGENTA | BRIGHT_CYAN | BRIGHT_WHITE>
    • ANSIEC.BG.rgb(r, g, b)
  • Styles
    • ANSIEC.OP.<RESET | BOLD | UNDER_LINE | REVERSE>
  • Screen
    • ANSIEC.OP.<CLEAR | CLEAR_LINE>
  • Cursor Navigation
    • ANSIEC.OP.<TOP>
    • ANSIEC.OP.<up(n) | down(n) | right(n) | left(n) | next_line(n) | prev_line(n) | to(row, column)>
from genlib.ansiec import ANSIEC
import time, sys, random

def loading(count):
    all_progress = [0] * count
    sys.stdout.write("\n" * count) 
    colors = [0] * count
    for i in range(count):
        colors[i] = ANSIEC.FG.rgb(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
        
    while any(x < 100 for x in all_progress):
        time.sleep(0.01)
        unfinished = [(i, v) for (i, v) in enumerate(all_progress) if v < 100]
        index, _ = random.choice(unfinished)
        all_progress[index] += 1
        
        sys.stdout.write(ANSIEC.OP.left(27))
        sys.stdout.write(ANSIEC.OP.up(count))
        for i, progress in enumerate(all_progress): 
            sys.stdout.write(colors[i])
            width = progress // 4
            print(f"[{'#'* width}{' ' * (25 - width)}] {ANSIEC.OP.RESET}{width*4}%")
            sys.stdout.write(ANSIEC.OP.RESET)

def main():
    loading(4)

if __name__ == "__main__":
    main()    

Data Boundary

SLIP encode/decode

from genlib import slip

def test_encode_decode(slip):
    data = "0.9992676 0.02764893 0.02612305 0.0"
    packet_send = slip.encode(data)
    print(packet_send)
    packet_recv = slip.decode(packet_send)[0]
    print(packet_recv, packet_recv == data)

def test_stream_decode(slip):
    data = []
    data.append(b'\xc00.9992676 0.02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.99')
    data.append(b'92676 0.02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.9992676 ')
    data.append(b'0.02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.9992676 0.')
    data.append(b'02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.9992676 0.02764893 0.0')
    data.append(b'2612305 0.0\xc0')
    data.append(b'\xc00.9992676 0.02764893 0.02612305 0.0\xc0')
        
    for d in data:
        for packet in slip.decode(d):
            print(packet)
    
if __name__ == "__main__":
    test_encode_decode()
    test_stream_decode()

HDLC encode/decode

import json
from genlib import hdlc

messages = ("Hello World", [10, 20], [3.14], [1000])

message_1 = json.dumps(messages[0]).encode()
message_2 = json.dumps(messages[1]).encode()
message_3 = json.dumps(messages[2]).encode()
message_4 = json.dumps(messages[3]).encode()

send_frame_1 = hdlc.encode(message_1)
send_frame_2 = hdlc.encode(message_2)
send_frame_3 = hdlc.encode(message_3)
send_frame_4 = hdlc.encode(message_4)

recv_frame_1 = send_frame_1[3:]
recv_frame_2 = send_frame_2 + send_frame_3[:2]
recv_frame_3 = send_frame_3[2:] + send_frame_4

recv_message_1 = hdlc.decode(recv_frame_1)
recv_message_2 = hdlc.decode(recv_frame_2)
recv_message_3 = hdlc.decode(recv_frame_3)

if not recv_message_1:
    print("fail")
else:
    for msg in recv_message_1:
        print(json.loads(msg))
    
if not recv_message_2:
    print("fail")
else:
    for msg in recv_message_2:
        print(json.loads(msg))
    
if not recv_message_3:
    print("fail")
else:
    for msg in recv_message_3:
        print(json.loads(msg))

Multicast

Multicast recv

import copy
from genlib.ansiec import ANSIEC
from genlib.udp import MulticastReceiver
         

MAX_RECV_LINE = 15
recv_data_list = []

def on_async_chatting_recv(receiver, message):
    recv_data_list.append(copy.deepcopy(message))
    if len(recv_data_list) > MAX_RECV_LINE:
        recv_data_list.pop(0)

    for i, data in enumerate(recv_data_list[: min(len(recv_data_list), MAX_RECV_LINE)]):
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.OP.CLEAR_LINE, end='')
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.FG.BRIGHT_YELLOW + f">>> RECEIVE {data.remote}: {data.payload}")
    
    print(ANSIEC.OP.to(MAX_RECV_LINE, 1) + ANSIEC.FG.BRIGHT_BLUE)

def main():
    receiver = MulticastReceiver()
    print(ANSIEC.OP.CLEAR, end='')
    receiver.onRecv(on_async_chatting_recv, unpickling=True)
    receiver.loopStart()
    
    while True:
        try:
            data = input(f"{ANSIEC.OP.to(MAX_RECV_LINE+1, 1) + ANSIEC.FG.BRIGHT_BLUE + ANSIEC.OP.CLEAR_LINE}{receiver.remote}: ")
            if data:
                receiver.sendTo(data) #Multiple unicast
        except KeyboardInterrupt:
            break
    
    receiver.loopStop()

if __name__ == "__main__":
    main()

Multicast send

import copy
from genlib.ansiec import ANSIEC
from genlib.udp import MulticastSender


MAX_RECV_LINE = 15
recv_data_list = []

def on_async_chatting_recv(sender, message): #Multiple Unicast
    recv_data_list.append(copy.deepcopy(message))
    if len(recv_data_list) > MAX_RECV_LINE:
        recv_data_list.pop(0)
    
    for i, data in enumerate(recv_data_list[: min(len(recv_data_list), MAX_RECV_LINE)]):
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.OP.CLEAR_LINE, end='')
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.FG.BRIGHT_RED + f">>> RECEIVE {data.remote}: {data.payload}")
    
    print(ANSIEC.OP.to(MAX_RECV_LINE, 1) + ANSIEC.FG.BRIGHT_BLUE)

def main():
    sender = MulticastSender() 
    print(ANSIEC.OP.CLEAR, end='')
    sender.onRecv(on_async_chatting_recv, unpickling=True)
    sender.loopStart()
    
    print(ANSIEC.OP.CLEAR, end='')

    while True:
        try:
            data = input(f"{ANSIEC.OP.to(MAX_RECV_LINE+1, 1) + ANSIEC.FG.BRIGHT_BLUE + ANSIEC.OP.CLEAR_LINE}{sender.group}: ")
            if data:
                sender.sendTo(data) #Only Multicast
        except KeyboardInterrupt:
            break
    
    sender.loopStop()
    
if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

genlib-0.6.1.tar.gz (11.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

genlib-0.6.1-py3-none-any.whl (10.4 kB view details)

Uploaded Python 3

File details

Details for the file genlib-0.6.1.tar.gz.

File metadata

  • Download URL: genlib-0.6.1.tar.gz
  • Upload date:
  • Size: 11.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for genlib-0.6.1.tar.gz
Algorithm Hash digest
SHA256 52801cbcea1df62566edc354ad4640ba2662b0941a496c4d07dc43556ec4d891
MD5 f668f5aae8c8f5a00fed0aafe4fe2644
BLAKE2b-256 633a9805da7fed185bfd39a7bda6f63e29b4376a7472389a0e15bdc8528b3a25

See more details on using hashes here.

File details

Details for the file genlib-0.6.1-py3-none-any.whl.

File metadata

  • Download URL: genlib-0.6.1-py3-none-any.whl
  • Upload date:
  • Size: 10.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for genlib-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 03499b9666df82f55e3f92b8e907ef2e671bef5d56a387782142c91558134506
MD5 dc41e3d6aa8c2ec362a5f7a6346d496c
BLAKE2b-256 f4dc2918a93574f2831a07363b87d8b24898fb734f03172b4c6a9226e1218794

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page