Skip to main content

Parquool - A Python Library for SQL like parquet query

Project description

Parquool

Parquool (package name: parquool) is a lightweight Python library that provides SQL-like querying for parquet datasets, partitioned writes, row-level upsert/update/delete operations, and other convenient data engineering helpers. It also includes some utility functions (logging, HTTP proxy requests, a task notification decorator) and an Agent wrapper built on openai-agents, together with a companion knowledge management tool called Collection.

The library aims to simplify common data management scenarios when using parquet files as storage locally or on a server. It uses DuckDB for high-performance SQL queries and supports writing query results back as partitioned parquet files. The Agent class provides a convenient, out-of-the-box interface to openai-agents. Collection offers an easy-to-use knowledge management tool that helps users quickly embed knowledge into a vector database for LLM access.

Key Features

  • Create DuckDB views with parquet_scan to query parquet data as if they were database tables.
  • Support upsert (merge) semantics based on primary keys, with optional partitioned writes (partition_by).
  • Support SQL-based update and delete operations and atomically replace directory contents to guarantee consistency.
  • Provide pandas-friendly select, pivot (DuckDB pivot and pandas pivot_table) and count methods.
  • Includes utilities: configurable logger, proxy_request with retry, and notify_task email notification decorator.
  • Agent wrapper integrated with openai-agents for easy agent creation and usage.
  • Knowledge management based on chromadb for vector store integration — useful for embedding content for Agents.

Installation

We recommend installing via pip:

pip install parquool

For knowledge integration, install:

pip install "parquool[knowledge]"

For web search tool integration, install:

pip install "parquool[websearch]"

Quick Start — DuckParquet

Below is a demonstration of common usage: creating a DuckParquet instance, querying, upserting, updating and deleting.

from parquool import DuckParquet
import pandas as pd

# Open a directory (created if it does not exist)
dp = DuckParquet('data/my_dataset')

# Query (equivalent to SELECT * FROM view)
df = dp.select(columns=['id', 'value'], where='value > 10', limit=100)
print(df.head())

# upsert: insert or update (must provide primary key columns)
new = pd.DataFrame([{'id': 1, 'value': 42}, {'id': 2, 'value': 99}])
dp.upsert_from_df(new, keys=['id'], partition_by=['id'])

# update: update column values by condition
dp.update(set_map={'value': 0}, where='value < 0')

# delete: remove rows matching the condition
dp.delete(where="id = 3")

Main Classes and Methods Overview

  • DuckParquet(dataset_path, name=None, db_path=None, threads=None)
    • select(...): General query interface supporting where, group_by, order_by, limit, distinct, etc.
    • dpivot(...): Perform wide-table pivot using DuckDB's PIVOT syntax.
    • ppivot(...): Perform pivot using pandas.pivot_table.
    • count(where=None): Count rows.
    • upsert_from_df(df, keys, partition_by=None): Upsert by keys, supports partitioning.
    • update(set_map, where=None, partition_by=None): Update columns using SQL expressions or values and overwrite the parquet directory.
    • delete(where, partition_by=None): Delete rows matching where and overwrite the parquet directory.
    • refresh(): Recreate or replace the DuckDB view (call after manual file changes).

Utilities

  • setup_logger(name, level='INFO', file=None, rotation=None, ...)

    • Quickly create a logger with optional file handler (supports rotation by size or time).
  • proxy_request(url, method='GET', proxies=None, delay=1, **kwargs)

    • HTTP request helper with retry support; tries provided proxies in order and falls back to direct connection.
  • notify_task(sender=None, password=None, receiver=None, smtp_server=None, smtp_port=None, cc=None)

    • A function decorator that sends an email notification after a task succeeds or fails. It supports converting pandas.DataFrame/Series to Markdown; can embed local images (CID) or attach files in the Markdown content.
    • Configurable through environment variables: NOTIFY_TASK_SENDER, NOTIFY_TASK_PASSWORD, NOTIFY_TASK_RECEIVER, NOTIFY_TASK_SMTP_SERVER, NOTIFY_TASK_SMTP_PORT, NOTIFY_TASK_CC.
    • Note: There is a remark in the source code indicating smtp_port may be assigned incorrectly — please verify configuration before use.

Agent Wrapper — Agent

BaseAgent wraps openai-agents with common initialization logic:

  • It reads environment variables (LITELLM_BASE_URL, LITELLM_API_KEY, LITELLM_MODEL_NAME, etc.) and configures a default litellm client.
  • Provides run/run_sync/run_streamed/cli methods for executing prompts, streaming output, and interactive CLI.

Simple example:

from parquool import Agent

agent = Agent(name='myagent')
# Synchronous run (blocking)
res = agent.run_sync('Summarize the following data...')
print(res)

Use Collection to link a knowledge collection and enable knowledge search:

from parquool import Collection

collection = Collection()
collection.load(["myfile.txt", "myfile.md"])
... # more files can be loaded; this usually only needs to be done once
agent = Agent(collection=collection)
agent.run_streamed_sync("What's my plan for tomorrow?")

You can visualize the agent with Streamlit. Install streamlit first via pip. If you want to add a web search tool, set up a SERPAPI key by adding SERPAPI_KEY to your environment variables.

Example Streamlit usage:

import streamlit as st
from parquool import Agent
from openai.types.responses import ResponseTextDeltaEvent


async def stream(prompt):
    async for event in st.session_state.agent.stream(prompt):
        # Print streaming delta if available
        if event.type == "raw_response_event" and isinstance(
            event.data, ResponseTextDeltaEvent
        ):
            yield event.data.delta
        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                yield f"{event.item.raw_item.name} - {event.item.raw_item.arguments}\n\n"
            elif event.item.type == "tool_call_output_item":
                yield event.item.output
            else:
                pass


st.title("Test Agent")

if not st.session_state.get("agent"):
    st.session_state.agent = Agent(
        tools=[Agent.google_search, Agent.read_url]
    )

st.session_state.messages = st.session_state.agent.get_conversation()

for message in st.session_state.messages:
    if message.get("role") == "user":
        with st.chat_message("user"):
            st.markdown(message["content"])
    elif message.get("role") == "assistant":
        with st.chat_message("assistant"):
            st.markdown(message["content"][0]["text"])
    elif message.get("type") == "function_call":
        with st.chat_message("assistant"):
            with st.expander(message["name"]):
                st.code(message["arguments"])
    elif message.get("type") == "function_call_output":
        with st.chat_message("assistant"):
            with st.expander("Expand to see the result"):
                st.code(message["output"])

if prompt := st.chat_input("What's up?"):
    with st.chat_message("user"):
        st.markdown(prompt)
    with st.chat_message("assistant"):
        response = st.write_stream(stream(prompt))

Environment Variables

It is recommended to create a .env file in the project root for configuration:

  • LITELLM_BASE_URL: Base URL for an OpenAI-compatible service (optional)
  • LITELLM_API_KEY: OpenAI API key
  • LITELLM_MODEL_NAME: Default model name to use
  • NOTIFY_TASK_*: Configuration for the notify_task decorator (see above)

Contributing

Issues and pull requests are welcome. When submitting a PR, please include related unit tests and reproduction steps where applicable, especially for changes that touch parquet file replacement and data consistency.

License

This project is declared under the MIT License in pyproject.toml.

Contact

Author: ppoak ppoak@foxmail.com
Project homepage: https://github.com/ppoak/parquool

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

parquool-0.0.6.tar.gz (39.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

parquool-0.0.6-py3-none-any.whl (37.5 kB view details)

Uploaded Python 3

File details

Details for the file parquool-0.0.6.tar.gz.

File metadata

  • Download URL: parquool-0.0.6.tar.gz
  • Upload date:
  • Size: 39.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.9 {"installer":{"name":"uv","version":"0.9.9"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for parquool-0.0.6.tar.gz
Algorithm Hash digest
SHA256 4ea38c647695018c05aae9620b402140ca4e2e690d89d4a1a65895ed4e5daebb
MD5 cf13f597ca6aed0046fc49a15e396210
BLAKE2b-256 37180927b439b1f2d41c94365bd294f95f3f53332010186caa59116969cfed92

See more details on using hashes here.

File details

Details for the file parquool-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: parquool-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 37.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.9 {"installer":{"name":"uv","version":"0.9.9"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for parquool-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 d90b0c83d8145412b31c5a567a80fade089ac7c9f323a33657a67c67a20e6d44
MD5 b5a59cdd1297cc7ad84b43624a92eae8
BLAKE2b-256 f5739b5b1f60ca5c2e9150d4571803a0ebcb2272b8946ec1d588fc04240505ac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page