Skip to main content

Package for reading and writing svb-zstd compressed data

Project description

pysz

Python interface for writing and reading svb-zstd compressed data

Installation

Dependencies

  • Python >= 3.8
  • Numpy >= 1.24.2
  • Pandas >= 2.0.0
  • Zstandard >= 0.20.0

Install using pip

# First, install a customized version of pystreamvbyte
pip install git+https://github.com/kevinzjy/pystreamvbyte.git

# Install pysz now
pip install pysz

Install from source

# First, install a customized version of pystreamvbyte
pip install git+https://github.com/kevinzjy/pystreamvbyte.git

# Install pysz from source
git clone --recursive https://github.com/kevinzjy/pysz
cd pysz
pip install .

Usage

Create & Write SZ file

Supported data types:

  • str
  • np.int16 / np.int32
  • np.uint16 / np.uint32
  • np.float16 / np.float32 / np.float64 / np.float128

Example for creating SZ file

import numpy as np
from pysz.api import CompressedFile

header = [('version',  '0.0.1'), ('date', '2023-04-03')]
attr = [('ID', str), ('Offset', np.int32), ('Raw_unit', np.float32)]
datasets = [('Raw', np.uint32), ('Fastq', str), ('Move', np.uint16)]

# Create new SZ file
sz = CompressedFile(
    "/tmp/test_sz", mode="w",
    header=header, attributes=attr, datasets=datasets,
    overwrite=True, n_threads=8
)

# Save data in single-read mode 
for i in range(10000):
    sz.put(
        f"read_{i}", # ID
        0, # Offset
        np.random.rand(), # Raw_unit
        np.random.randint(70, 150, 4000), # Raw
        ''.join(np.random.choice(['A', 'T', 'C', 'G'], 450)), # Fastq
        np.random.randint(0, 1, 4000), # Move
    )

# Save data in chunk mode
for i in range(100):
    chunk = []
    for j in range(100):
        chunk.append((
            f"read_{i}_{j}",
            0,
            np.random.rand(),
            np.random.randint(70, 150, 4000),
            ''.join(np.random.choice(['A', 'T', 'C', 'G'], 450)),
            np.random.randint(0, 1, 4000),
        ))   
    sz.put_chunk(chunk)

# Remember to call it to ensure everything finished successfully
sz.close()

Example for creating SZ file with multiprocessing

Note: for creating SZ file with multiprocessing, pass the CompressedFile.q_in (a multiprocessing.Manager().Queue() instance) as parameter to parallelized function. Then use queue.put((True, chunk)) and queue.put((True, read)) for saving reads or chunks directly.

import numpy as np
from pysz.api import CompressedFile
from multiprocessing import Pool

def writer(queue, chunk_id):
    # Write in single-read mode
    for i in range(10):
        read = (
            f"read_{chunk_id}_{i}",
            0,
            np.random.rand(),
            np.random.randint(70, 150, 4000),
            ''.join(np.random.choice(['A', 'T', 'C', 'G'], 450)),
            np.random.randint(0, 1, 4000),
            np.random.random(4000),
        )
        queue.put((False, read))

    # Write in chunk mode
    chunk = []
    for i in range(10):
        chunk.append((
            f"chunk_{chunk_id}_{i}",
            0,
            np.random.rand(),
            np.random.randint(70, 150, 4000),
            ''.join(np.random.choice(['A', 'T', 'C', 'G'], 450)),
            np.random.randint(0, 1, 4000),
            np.random.random(4000),
        ))
    queue.put((True, chunk))


header = [('version',  '0.0.1'), ('date', '2023-04-03')]
attr = [('ID', str), ('Offset', np.int32), ('Raw_unit', np.float32)]
datasets = [('Raw', np.uint32), ('Fastq', str), ('Move', np.uint16)]

# Create new SZ file
sz = CompressedFile(
    "/tmp/test_sz", mode="w",
    header=header, attributes=attr, datasets=datasets,
    overwrite=True, n_threads=8
)

# Use pool for multiprocessing
pool = Pool(8)
jobs = []
for i in range(10):
    jobs.append(pool.apply_async(writer, (sz.q_in, i, )))
pool.close()
pool.join()

# Remember to call it to ensure everything finished successfully
sz.close()

Examples for reading SZ file

import numpy as np
from pysz.api import CompressedFile
sz = CompressedFile(
        "/tmp/test_sz", mode="r", allow_multiprocessing=True,
)

# Get index information
print(sz.idx.head())

# Get total read number
print(f"Loaded {sz.idx.shape[0]} reads")

# Get the first read
read = sz.get(0)
print(read.ID, read.Offset, read.Raw_unit, read.Raw, read.Fastq, read.Move)

# Get first 10 reads
reads = sz.get(np.arange(10))

# Filter some reads
idx = sz.idx.index[sz.idx['ID'].isin(['read_0', 'read_1'])]
reads = sz.get(idx)

Example for reading SZ file chunk by chunk using multiprocessing

Note: for reading SZ file with multiprocessing, pass the chunked index as parameter, and init CompressedFile instance with allow_multiprocessing=True separately in each process.

import numpy as np
from pysz.api import CompressedFile
from pysz.utils import error_callback
from multiprocessing import Pool

def grouper(iterable, n, fillvalue=None):
    """
    Collect data info fixed-length chunks or blocks
    grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
    """
    from itertools import zip_longest
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

def read_from_sz(file_name, idx):
    """
    Read from SZ inside each process
    """
    sz = CompressedFile(file_name, mode='r', allow_multiprocessing=True)
    reads = sz.get(idx)
    sz.close()
    return reads
    
# Init pool
n_threads = 16
chunk_size = 500

# Get all reads stored in SZ file
file_name = '/tmp/test_sz'
sz = CompressedFile(file_name, mode='r')
index = sz.idx
sz.close()

# Add process to Pool
pool = Pool(n_threads)
jobs = []
for x in grouper(index.index, chunk_size):
    chunk = [i for i in x if i is not None]
    jobs.append(pool.apply_async(read_from_sz, (file_name, chunk, ), error_callback=error_callback))
pool.close()

# Get output
for job in jobs:
    ret = job.get()
    # Process your data here
pool.join()

Please refer to tests/test_api.py for detailed usage.

TODO

  • Add support for zfp for lossy float compression
  • Refactor the naive solution for using multiprocessing for writing SZ.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pysz-0.4.0.tar.gz (10.8 kB view hashes)

Uploaded Source

Built Distribution

pysz-0.4.0-py3-none-any.whl (9.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page