Skip to main content

This is the basic library for xkit.

Project description

This is the basic library for xkit.

History

  • V0.5.0
    • Supports InetCAN
    • Improved UDPServer local address allocation method
  • V0.3.0
    • wrapper SLIP
    • Supports UDP and multicast
    • Supports ANSI Escape Codes

Install

pip install genlib

ANSI Escape Codes

  • Foreground color settings
    • ANSIEC.FG.<BLACK | RED | GREEN | YELLOW | BLUE | MAGENTA | CYAN | WHITE>
    • ANSIEC.FG.<BRIGHT_BLACK | BRIGHT_RED | BRIGHT_GREEN | BRIGHT_YELLOW | BRIGHT_BLUE | BRIGHT_MAGENTA | BRIGHT_CYAN | BRIGHT_WHITE>
    • ANSIEC.FG.rgb(r, g, b)
  • Background color settings
    • ANSIEC.BG.<BLACK | RED | GREEN | YELLOW | BLUE | MAGENTA | CYAN | WHITE>
    • ANSIEC.BG.<BRIGHT_BLACK | BRIGHT_RED | BRIGHT_GREEN | BRIGHT_YELLOW | BRIGHT_BLUE | BRIGHT_MAGENTA | BRIGHT_CYAN | BRIGHT_WHITE>
    • ANSIEC.BG.rgb(r, g, b)
  • Styles
    • ANSIEC.OP.<RESET | BOLD | UNDER_LINE | REVERSE>
  • Screen
    • ANSIEC.OP.<CLEAR | CLEAR_LINE>
  • Cursor Navigation
    • ANSIEC.OP.<TOP>
    • ANSIEC.OP.<up(n) | down(n) | right(n) | left(n) | next_line(n) | prev_line(n) | to(row, column)>
from genlib.ansiec import ANSIEC
import time, sys, random

def loading(count):
    all_progress = [0] * count
    sys.stdout.write("\n" * count) 
    colors = [0] * count
    for i in range(count):
        colors[i] = ANSIEC.FG.rgb(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
        
    while any(x < 100 for x in all_progress):
        time.sleep(0.01)
        unfinished = [(i, v) for (i, v) in enumerate(all_progress) if v < 100]
        index, _ = random.choice(unfinished)
        all_progress[index] += 1
        
        sys.stdout.write(ANSIEC.OP.left(27))
        sys.stdout.write(ANSIEC.OP.up(count))
        for i, progress in enumerate(all_progress): 
            sys.stdout.write(colors[i])
            width = progress // 4
            print(f"[{'#'* width}{' ' * (25 - width)}] {ANSIEC.OP.RESET}{width*4}%")
            sys.stdout.write(ANSIEC.OP.RESET)

def main():
    loading(4)

if __name__ == "__main__":
    main()    

Serial Line Internet Protocol

from genlib.slip import Slip

def test_encode_decode(slip):
    data = "0.9992676 0.02764893 0.02612305 0.0"
    packet_send = slip.encode(data)
    print(packet_send)
    packet_recv = slip.decode(packet_send)[0]
    print(packet_recv, packet_recv == data)

def test_stream_decode(slip):
    data = []
    data.append(b'\xc00.9992676 0.02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.99')
    data.append(b'92676 0.02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.9992676 ')
    data.append(b'0.02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.9992676 0.')
    data.append(b'02764893 0.02612305 0.0\xc0')
    data.append(b'\xc00.9992676 0.02764893 0.0')
    data.append(b'2612305 0.0\xc0')
    data.append(b'\xc00.9992676 0.02764893 0.02612305 0.0\xc0')
        
    for d in data:
        for packet in slip.decode(d):
            print(packet)

def main():
    slip = Slip()
    
    test_encode_decode(slip)
    test_stream_decode(slip)
    
if __name__ == "__main__":
    main()    

Multicast

Multicast recv

import copy
from genlib.ansiec import ANSIEC
from genlib.udp import MulticastReceiver
         

MAX_RECV_LINE = 15
recv_data_list = []

def on_async_chatting_recv(receiver, message):
    recv_data_list.append(copy.deepcopy(message))
    if len(recv_data_list) > MAX_RECV_LINE:
        recv_data_list.pop(0)

    for i, data in enumerate(recv_data_list[: min(len(recv_data_list), MAX_RECV_LINE)]):
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.OP.CLEAR_LINE, end='')
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.FG.BRIGHT_YELLOW + f">>> RECEIVE {data.remote}: {data.payload}")
    
    print(ANSIEC.OP.to(MAX_RECV_LINE, 1) + ANSIEC.FG.BRIGHT_BLUE)

def main():
    receiver = MulticastReceiver()
    print(ANSIEC.OP.CLEAR, end='')
    receiver.onRecv(on_async_chatting_recv, unpickling=True)
    receiver.loopStart()
    
    while True:
        try:
            data = input(f"{ANSIEC.OP.to(MAX_RECV_LINE+1, 1) + ANSIEC.FG.BRIGHT_BLUE + ANSIEC.OP.CLEAR_LINE}{receiver.remote}: ")
            if data:
                receiver.sendTo(data) #Multiple unicast
        except KeyboardInterrupt:
            break
    
    receiver.loopStop()

if __name__ == "__main__":
    main()

Multicast send

import copy
from genlib.ansiec import ANSIEC
from genlib.udp import MulticastSender


MAX_RECV_LINE = 15
recv_data_list = []

def on_async_chatting_recv(sender, message): #Multiple Unicast
    recv_data_list.append(copy.deepcopy(message))
    if len(recv_data_list) > MAX_RECV_LINE:
        recv_data_list.pop(0)
    
    for i, data in enumerate(recv_data_list[: min(len(recv_data_list), MAX_RECV_LINE)]):
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.OP.CLEAR_LINE, end='')
        print(ANSIEC.OP.to(i+1,1) + ANSIEC.FG.BRIGHT_RED + f">>> RECEIVE {data.remote}: {data.payload}")
    
    print(ANSIEC.OP.to(MAX_RECV_LINE, 1) + ANSIEC.FG.BRIGHT_BLUE)

def main():
    sender = MulticastSender() 
    print(ANSIEC.OP.CLEAR, end='')
    sender.onRecv(on_async_chatting_recv, unpickling=True)
    sender.loopStart()
    
    print(ANSIEC.OP.CLEAR, end='')

    while True:
        try:
            data = input(f"{ANSIEC.OP.to(MAX_RECV_LINE+1, 1) + ANSIEC.FG.BRIGHT_BLUE + ANSIEC.OP.CLEAR_LINE}{sender.group}: ")
            if data:
                sender.sendTo(data) #Only Multicast
        except KeyboardInterrupt:
            break
    
    sender.loopStop()
    
if __name__ == "__main__":
    main()

InetCAN (Client)

The ICANServer must be running on the device (SerBot2 etc)

from genlib.ican import InetCAN

ican = InetCAN("127.0.0.1") # ICANServer address

def onUltrasonic(data):
    for pos, d in enumerate(data):
        print(f"{d: >3.0f}", flush=True)    

ican.setFilter(us.id)
ican.write(0x012, [0x07])

while True:
    id, data = ican.read()
    data = struct.unpack("hhh", struct.pack("BBBBBB", *data))
    print(id, data)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

genlib-0.6.0.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

genlib-0.6.0-py3-none-any.whl (8.8 kB view details)

Uploaded Python 3

File details

Details for the file genlib-0.6.0.tar.gz.

File metadata

  • Download URL: genlib-0.6.0.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for genlib-0.6.0.tar.gz
Algorithm Hash digest
SHA256 75536167825c8251e6b293094274fafb00f503750354d36c8d223f857f5ea52d
MD5 9ed436c6e1713f8a4cd3b2899f54f13a
BLAKE2b-256 d67b0ba477a133c54b96c17f971fac31b5d36ea1eae7f3896e2620e93e008c11

See more details on using hashes here.

File details

Details for the file genlib-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: genlib-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 8.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.1

File hashes

Hashes for genlib-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6d3718311da755fe63f7b0441390a73acec08402479f1fc8dada39ddaff3e4e5
MD5 b5b852ba16637c17aef37735204179e2
BLAKE2b-256 47c9ebf2902ebf468899eb7a9cb25c6592ff66d56f45a3027da25fa70fe4e89b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page