Skip to main content

Interprocess real time message/audio/video buffers

Project description

memcom

Interprocess real time message/audio/video buffers


Table of contents

 


Install

$ pip3 install memcom

 


Examples

Example that creates a test clip

import os
import time
import asyncio
import memcom

def on_error(ctx, e):
    print(ctx.getName(), '---', e)

async def run():

    print(memcom.__info__)

    # Video file name
    if not os.path.isdir('./out'):
        os.mkdir('./out')
    fname = './out/testvid.avi'

    # Clip length in seconds
    t = 30

    # Time divider / Clip will be t / div seconds in length
    # Using a higher divider gives more time to process
    # If your CPU can´t keep up, increase this value
    div = 1

    #---------------------------------------------------------------
    # Video parameters
    w = 800
    h = 600
    vfps = 15

    print('Create video share')
    vid = memcom.mcVideo()
    if not vid.create(bufs=2*vfps, width=w, height=h, fps=vfps, cleanup=True):
        raise Exception(vid.getError())

    vsname = vid.getName()
    print(f'Video share created: {vsname}')


    #---------------------------------------------------------------
    # Audio Parameters
    ch = 2
    bps = 16
    asr = 48000
    afps = 50

    print('Create audio share')
    aud = memcom.mcAudio()
    if not aud.create(bufs=2*afps, ch=ch, bps=bps, bitrate=asr, fps=afps, cleanup=True):
        raise Exception(aud.getError())

    asname = aud.getName()
    print(f'Audio share created: {asname}')


    #---------------------------------------------------------------
    # Create initial rectangles

    # Initial rects
    n = 0

    # Target number of rects / adds one each second
    m = 4

    tv = []
    rects = [{'x': 0, 'y': 0, 'w': w, 'h': h}]
    for i in range(0, n-1):
        memcom.addRect(rects)

    ri = 0
    for r in rects:
        ri += 1
        v = memcom.mcTestVid(on_error=on_error,
                             opts={'name': f'Rect{ri}', 'roi':r,
                                   'video':vsname, 'vbias':-0.25, 'vwin':0.25,
                                   'audio':asname, 'abias':-0.25, 'awin':0.25})
        if not v.create():
            raise Exception(v.getError())
        tv.append(v)


    #---------------------------------------------------------------
    # Create file recorder
    print('Create recorder')
    rec = memcom.mcRecord(on_error=on_error,
                          opts={'name': 'Recorder',
                                'video':vsname, 'vbias':-0.5, 'vwin':0.25,
                                'audio':asname, 'abias':-0.5, 'awin':0.25})
    if not rec.create(fname):
    # if not rec.create(fname, opts={'roi':{'x':int(w/4), 'y':int(h/4), 'w':int(w/2), 'h':int(h/2)}}):
        raise Exception(rec.getError())


    #---------------------------------------------------------------
    # Create frame eraser / clears frames before reuse
    print('Create Eraser')
    erase = memcom.mcBlank(on_error=on_error,
                           opts={'name': 'Eraser',
                                 'video':vsname, 'vbias':-0.75, 'vwin':0.25,
                                 'audio':asname, 'abias':-0.75, 'awin':0.25})
    if not erase.create():
        raise Exception(rec.getError())


    #---------------------------------------------------------------
    # Create the clock
    print('Create clock')
    clk = memcom.mcClock(on_error=on_error,
                         opts={'name': 'Clock', 'div':div,
                               'video':vsname, 'vfps':vfps, 'vbias':0, 'vwin':0.25,
                               'audio':asname, 'afps':afps, 'abias':0, 'awin':0.25})
    clk.create()


    #---------------------------------------------------------------
    # Run everything for t * div seconds
    d = 0
    c = 0
    t = t * div
    while 0 < t:
        time.sleep(1)
        t -= 1
        d += 1
        if d >= div:
            d = 0
            c += 1
            if c >= 4:
                c = 0
                if len(rects) < m:
                    print('Add rect')
                    r = memcom.addRect(rects)
                    ri = len(rects) + 1
                    v = memcom.mcTestVid(on_error=on_error,
                                            opts={'name': f'Rect{ri}', 'roi':r,
                                                'video':vsname, 'vbias':-0.25, 'vwin':0.25,
                                                'audio':asname, 'abias':-0.25, 'awin':0.25})
                    if not v.create():
                        raise Exception(v.getError())
                    tv.append(v)


    # Close everything
    clk.close()
    erase.close()
    rec.close()
    for v in tv:
        v.close()
    vid.close()


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(run())

 


References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

memcom-0.1.1-py3-none-any.whl (24.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page