Skip to main content

No project description provided

Project description

README

PyPI version

This is the Python wrapper of the Feathr online transformation service.

There are 2 major classes in this package:

  • PiperService, this is the service class, it is used to start a HTTP service to handle the transformation requests. It doesn't support HTTPS and authentication, so you may need to setup gateway or proxy to handle the security issues. The PiperService class has a start method to start the HTTP service in blocking mode, and start_async method to start the service in the async context.

  • Piper, this is the transformation engine, it can be use to transform data directly, mainly for development and testing purpose. The Piper class has a process method to transform data in blocking mode, and process_async method to transform data in the async context.

Both above classes support UDF and UDLF written in Python.

NOTE: Because of the GIL, pure Python code cannot run concurrently, that means using Python UDF could slow down the transformation service, especially on heavy load.

Value Types

All values passed to the pipeline must be in one of the following types:

  • None
  • Simple types: bool, int, float, str
  • Date/time is represented as datetime.DateTime.
  • List: List of supported types.
  • Map: Map of supported types, keys must be string, and value can be any supported type.

All values returned by the pipeline will also be in above types.

NOTE: When using Python big integer, exception will be thrown if any it exceeds the range of 64-bit signed integer.

User Defined Function (UDF) in Python

The UDF is implemented as ordinary Python function, and it must be registered to the service before it can be used in the pipeline.

  • The UDF function can only accept positional arguments, keyword arguments are not supported.
  • The UDF function must be able to be invoked by the usage in the DSL script, i.e. a UDF with 2 fixed arguments and 1 optional argument can be invoked as udf(1, 2) or udf(1, 2, 3), but not udf(1, 2, 3, 4) or udf(1).
  • The UDF function may raise any exception, and the returned value will be record as an error. This error will be propagated to the caller.
  • Every function and operator that takes error as the input will return the error.
  • At the final output stage of the pipeline, the error value will be converted to None, and the error will be recorded in a separated error list.
  • The UDF function will never see the error as the input, the invocation is bypassed before the UDF function is called if any of the argument is error.
  • The execution order is non-deterministic, so the UDF function shall not make any assumptions.
  • The UDF function should not block, such behavior is not strictly forbidden but the performance will be impacted significantly.

User Defined Lookup Function (UDLF) in Python

Usually lookup is to fetch external data, such as a database or a web service, so the lookup data source is implemented as a Python async functions, and it must be registered to the piper or the service before it can be used in the pipeline.

The lookup function is called with a single key and a list of requested field names, and it should return a list of rows that each row is a list that aligns with the requested fields, or an empty list when lookup failed. The key must be in the supported simple types, list and dict cannot be used as key, and using None as the key will get None as the value of all returned fields without actually calling the lookup function.

async def my_fancy_lookup_function(key: Any, fields: List[str]) -> List[List[Any]]:
    ...
    return [
        [some_data[f] for f in fields],
        [some_other_data[f] for f in fields],
    ]

The lookup function must be added to the Piper or PiperService before it can be used in the pipeline:

piper = Piper(pipeline_def, {"lookup_name": my_fancy_lookup_function}, ...)

or

svc = PiperService(pipeline_def, {"lookup_name": my_fancy_lookup_function}, ...)

Then you can use the lookup data source in the pipeline in a lookup transformation:

pipeline_name(...)
| ...
| lookup field1, field2 from lookup_name on key
| ...
;

or a join transformation:

pipeline_name(...)
| ...
| join kind=left-inner field1, field2 from lookup_name on key
| ...
;

Once the user-defined lookup function is used, the Piper and PiperService must be used in async context, otherwise all async function will never be executed and the program may hang forever. Also you need to replace process with process_async, and start with start_async.

piper = Piper(pipeline_def, {"lookup_name": lookup_function})

async def test():
    await piper.process_async(...)

asyncio.run(test())

For more information about Python async programming, please refer to Python Asyncio.

NOTE:

  • Because of the asynchronous nature of the lookup function, it's recommended to use asyncio compatible libraries to implement the lookup function, traditional blocking libraries may cause the performance issue, e.g. use aiohttp or HTTPX instead of Requests.
  • This package only supports asyncio, Twisted or Gevent based libraries are not supported.
  • In order to lookup data from a standard JSON-based HTTP API, you can use builtin HTTP client instead of implementing your own lookup function, register the lookup data source either in a JSON string or a dict with correct content, detailed doc is at here.
  • The feathrpiper also has builtin support of SqlServer/AzureSQL, Sqlite3, and Azure CosmosDb.

Integration with Other Web-Service Frameworks

The feathrpiper contains built-in web service, but it doesn't support HTTPS and authentication, and has a specific HTTP API spec which cannot be changed from the Python side. In case you need to use it in any other scenario, you may integrate it with other Web service frameworks.

  • Flask: prefer to use async version of Flask, such as Flask-Async, Flask-RESTful-Async, Flask-RESTX-Async, etc. And you should use process_async to process the request.
  • FastAPI: FastAPI is fully async-based, use process_async to process the request.
  • Any other Web framework that doesn't support async: You can use process in non-async context, but the user-defined lookup function feature will be unavailable.

A demo of integrating with FastAPI is at here

Packaging and Deployment

The feathrpiper package is a standard Python package without external dependency, you need to write your own code using the package to implement your own transformation service.

The packaging and the deployment process is also standard, refer to the official document if you need to build Docker image, currently we don't have any pre-built Docker image for the Python package.

In most cases, the packaging process could be like:

  1. Prepare the requirements.txt file which includes the feathrpiper package and all the other dependencies.
    # This package
    feathrpiper >= 0.4.3
    # Any other dependencies
    pandas == 1.5.2
    pytorch >= 1.0.0
    ...
    
  2. Prepare a Dockerfile file which includes the requirements.txt file and the code to run the service.
    FROM python:3.9-slim-buster
    COPY requirements.txt /tmp/
    RUN pip install -r /tmp/requirements.txt
    COPY . /app
    WORKDIR /app
    # In case you want to use the built-in web service provided by `PiperService` class and it's listening at the port 8000
    # Or you write your own web service and it's listening at the port 8000
    EXPOSE 8000
    CMD ["python", "main.py"]
    
  3. Build the Docker image:
    docker build -t my_image .
    
  4. Run the Docker image:
    docker run -p 8000:8000 my_image
    

Building from Source

The feathrpiper package is written in Rust, so you need to setup the Rust toolchain to build it from source. The Rust toolchain can be installed from here. The development is done in Rust 1.65, older version may not work.

  1. Install maturin:
    pip install maturin
    
  2. Build the package under the feathrpiper_root/python directory:
    maturin build --release
    

More information about maturin can be found here. Please note that running cargo build in the top level directory won't build the Python package because the python package project is excluded from the workspace for some technical issues.

Limitations and Known Issues

  • The PiperService class supports plain HTTP only, and it doesn't support any kind of authentication.
  • The feathrpiper supports Python 3.7~3.11, no support for Python 3.6 or earlier, and no support for Python 2.
  • The package published on PyPI only support following platforms:
    • Linux arm64
    • Linux armv7
    • Linux x86_64
    • macOS x86_64/AppleSilicon universal
    • Windows x86_64

You need to build the package from source if you need to use it on other platforms.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

feathrpiper-0.4.6.tar.gz (111.9 kB view hashes)

Uploaded source

Built Distributions

feathrpiper-0.4.6-cp311-none-win_amd64.whl (5.8 MB view hashes)

Uploaded cp311

feathrpiper-0.4.6-cp310-none-win_amd64.whl (5.8 MB view hashes)

Uploaded cp310

feathrpiper-0.4.6-cp39-none-win_amd64.whl (5.8 MB view hashes)

Uploaded cp39

feathrpiper-0.4.6-cp38-none-win_amd64.whl (5.8 MB view hashes)

Uploaded cp38

feathrpiper-0.4.6-cp37-none-win_amd64.whl (5.8 MB view hashes)

Uploaded cp37

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page