Python client for the nv-ingest service
Project description
NV-Ingest-Client
NV-Ingest-Client is a tool designed for efficient ingestion and processing of large datasets. It provides both a Python API and a command-line interface to cater to various ingestion needs.
[!Note] NV-Ingest is also known as NVIDIA Ingest and NeMo Retriever extraction.
Table of Contents
Installation
To install NV-Ingest-Client, run the following command in your terminal:
pip install [REPO_ROOT]/client
This command installs both the API libraries and the nv-ingest-cli tool which can subsequently be called from the
command line.
API Libraries
nv_ingest_client.primitives.jobs
JobSpec
Specification for creating a job for submission to the nv-ingest microservice.
-
Parameters:
payload(Dict): The payload data for the job.tasks(Optional[List], optional): A list of tasks to be added to the job. Defaults to None.source_id(Optional[str], optional): An identifier for the source of the job. Defaults to None.source_name(Optional[str], optional): A name for the source of the job. Defaults to None.document_type(Optional[str], optional): Type of the document. Defaults to 'txt'.job_id(Optional[Union[UUID, str]], optional): A unique identifier for the job. Defaults to a new UUID.extended_options(Optional[Dict], optional): Additional options for job processing. Defaults to None.
-
Attributes:
_payload(Dict): Storage for the payload data._tasks(List): Storage for the list of tasks._source_id(str): Storage for the source identifier._job_id(UUID): Storage for the job's unique identifier._extended_options(Dict): Storage for the additional options.
-
Methods:
- to_dict() -> Dict:
- Description: Converts the job specification to a dictionary for JSON serialization.
- Returns:
Dict: Dictionary representation of the job specification.
- add_task(task):
- Description: Adds a task to the job specification.
- Parameters:
task: The task to be added. Assumes the task has ato_dict()method.
- Raises:
ValueError: If the task does not have ato_dict()method or is not an instance ofTask.
- to_dict() -> Dict:
-
Properties:
payload: Getter/Setter for the payload data.job_id: Getter/Setter for the job's unique identifier.source_id: Getter/Setter for the source identifier.source_name: Getter/Setter for the source name.
-
Example Usage:
job_spec = JobSpec( payload={"data": "Example data"}, tasks=[extract_task, split_task], source_id="12345", job_id="abcd-efgh-ijkl-mnop", extended_options={"tracing_options": {"trace": True}} ) print(job_spec.to_dict())
nv_ingest_client.primitives.tasks
Task Factory
-
Function:
task_factory(task_type, **kwargs)- Description: Factory method for creating task objects based on the provided task type. It dynamically selects the appropriate task class from a mapping and initializes it with any additional keyword arguments.
- Parameters:
task_type(TaskType or str): The type of the task to create. Can be an enum member ofTaskTypeor a string representing a valid task type.**kwargs(dict): Additional keyword arguments to pass to the task's constructor.
- Returns:
Task: An instance of the task corresponding to the given task type.
- Raises:
ValueError: If an invalid task type is provided, or if any unexpected keyword arguments are passed that do not match the task constructor's parameters.
-
Example:
# Assuming TaskType has 'Extract' and 'Split' as valid members and corresponding classes are defined. extract_task = task_factory('extract', document_type='PDF', extract_text=True) split_task = task_factory('split', split_by='sentence', split_length=100)
ExtractTask
Object for document extraction tasks, extending the Task class.
-
Method:
__init__(document_type, extract_method='pdfium', extract_text=False, extract_images=False, extract_tables=False)- Parameters:
document_type: Type of document.extract_method: Method used for extraction. Default is 'pdfium'.extract_text: Boolean indicating if text should be extracted. Default is False.extract_images: Boolean indicating if images should be extracted. Default is False.extract_tables: Boolean indicating if tables should be extracted. Default is False.extract_page_as_image: Boolean indicating if each page should be rendered as an image for embedding. Default is False.
- Description: Sets up configuration for the extraction task.
- Parameters:
-
Method:
to_dict()- Description: Converts task details to a dictionary for submission to message client. Includes handling for specific methods and document types.
- Returns:
Dict: Dictionary containing task type and properties.
-
Example:
extract_task = ExtractTask( document_type=file_type, extract_text=True, extract_images=True, extract_tables=True )
SplitTask
Object for document splitting tasks, extending the Task class.
-
Method:
__init__(split_by=None, split_length=None, split_overlap=None, max_character_length=None, sentence_window_size=None)- Parameters:
split_by: Criterion for splitting, e.g., 'word', 'sentence', 'passage'.split_length: The length of each split segment.split_overlap: Overlap length between segments.max_character_length: Maximum character length for a split.sentence_window_size: Window size for sentence-based splits.
- Description: Sets up configuration for the splitting task.
- Parameters:
-
Method:
to_dict()- Description: Converts task details to a dictionary for submission to message client.
- Returns:
Dict: Dictionary containing task type and properties.
-
Example:
split_task = SplitTask( split_by="word", split_length=300, split_overlap=10, max_character_length=5000, sentence_window_size=0, )
nv_ingest_client.client.client
The NvIngestClient class provides a comprehensive suite of methods to handle job submission and retrieval processes
efficiently. Below are the public methods available:
Initialization
-
__init__: Initializes the NvIngestClient with customizable client allocator and Redis configuration.- Parameters:
message_client_allocator: A callable that returns an instance of the client used for communication.message_client_hostname: Hostname of the message client server. Defaults to "localhost".message_client_port: Port number of the message client server. Defaults to 7670.message_client_kwargs: Additional keyword arguments for the message client.msg_counter_id: Redis key for tracking message counts. Defaults to "nv-ingest-message-id".worker_pool_size: Number of worker processes in the pool. Defaults to 1.
- Parameters:
-
Example:
client = NvIngestClient( message_client_hostname="localhost", # Host where nv-ingest-ms-runtime is running message_client_port=7670 # REST port, defaults to 7670 )
Submission Methods
submit_job
Submits a job to a specified job queue. This method can optionally wait for a response if blocking is set to True.
-
Parameters:
job_id: The unique identifier of the job to be submitted.job_queue_id: The ID of the job queue where the job will be submitted.
-
Returns:
- Optional[Dict]: The job result if blocking is True and a result is available before the timeout, otherwise None.
-
Raises:
- Exception: If submitting the job fails.
-
Example:
client.submit_job(job_id, "ingest_task_queue")
submit_jobs
Submits multiple jobs to a specified job queue. This method does not wait for any of the jobs to complete.
-
Parameters:
job_ids: A list of job IDs to be submitted.job_queue_id: The ID of the job queue where the jobs will be submitted.
-
Returns:
- List[Union[Dict, None]]: A list of job results if blocking is True and results are available before the timeout, otherwise None.
-
Example:
client.submit_jobs([job_id0, job_id1], "ingest_task_queue")
submit_job_async
Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.
-
Parameters:
job_ids: A single job ID or a list of job IDs to be submitted.job_queue_id: The ID of the job queue where the jobs will be submitted.
-
Returns:
- Dict[Future, str]: A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
-
Notes:
- This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
- It does not wait for any of the jobs to complete.
- Ensure that each job is in the proper state before submission.
-
Example:
client.submit_job_async(job_id, "ingest_task_queue")
Job Retrieval
fetch_job_result
-
Description: Fetches the job result from a message client, handling potential errors and state changes.
-
Method:
fetch_job_result(job_id, timeout=10, data_only=True) -
Parameters:
job_id(str): The identifier of the job.timeout(float, optional): Timeout for the fetch operation in seconds. Defaults to 10.data_only(bool, optional): If true, only returns the data part of the job result.
-
Returns:
- Tuple[Dict, str]: The job result and the job ID.
-
Raises:
ValueError: If there is an error in decoding the job result.TimeoutError: If the fetch operation times out.Exception: For all other unexpected issues.
-
Example:
job_id = client.add_job(job_spec) client.submit_job(job_id, TASK_QUEUE) generated_metadata = client.fetch_job_result( job_id, timeout=DEFAULT_JOB_TIMEOUT )
fetch_job_result_async
-
Description: Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.
-
Method:
fetch_job_result_async(job_ids, timeout=10, data_only=True) -
Parameters:
job_ids(Union[str, List[str]]): A single job ID or a list of job IDs.timeout(float, optional): Timeout for fetching each job result, in seconds. Defaults to 10.data_only(bool, optional): Whether to return only the data part of the job result.
-
Returns:
- Dict[Future, str]: A dictionary mapping each future to its corresponding job ID.
-
Raises:
- No explicit exceptions raised but leverages the exceptions from
fetch_job_result.
- No explicit exceptions raised but leverages the exceptions from
-
Example:
job_id = client.add_job(job_spec) client.submit_job(job_id, TASK_QUEUE) generated_metadata = client.fetch_job_result_async( job_id, timeout=DEFAULT_JOB_TIMEOUT )
Job and Task Management
job_count
-
Description: Returns the number of jobs currently tracked by the client.
-
Method:
job_count() -
Returns: Integer representing the total number of jobs.
-
Example:
client.job_count()
add_job
-
Description: Adds a job specification to the job tracking system.
-
Method:
add_job(job_spec) -
Parameters:
job_spec(JobSpec, optional): The job specification to add. If not provided, a new job ID will be generated.
-
Returns: String representing the job ID of the added job.
-
Raises:
ValueError: If a job with the specified job ID already exists.
-
Example:
extract_task = ExtractTask( document_type=file_type, extract_text=True, extract_images=True, extract_tables=True, text_depth="document", extract_tables_method="yolox", ) job_spec.add_task(extract_task) job_id = client.add_job(job_spec)
create_job
- Description: Creates a new job with specified parameters and adds it to the job tracking dictionary.
- Method:
create_job(payload, source_id, source_name, document_type, tasks, job_id, extended_options) - Parameters:
payload(str): The payload associated with the job.source_id(str): The source identifier for the job.source_name(str): The unique name of the job's source data.document_type(str, optional): The type of document to be processed.tasks(list, optional): A list of tasks to be associated with the job.job_id(uuid.UUID | str, optional): The unique identifier for the job.extended_options(dict, optional): Additional options for job creation.
- Returns: String representing the job ID.
- Raises:
ValueError: If a job with the specified job ID already exists.
add_task
-
Description: Adds a task to an existing job.
-
Method:
add_task(job_id, task) -
Parameters:
job_id(str): The job ID to which the task will be added.task(Task): The task to add.
-
Raises:
ValueError: If the job does not exist or is not in the correct state.
-
Example:
job_spec = JobSpec( document_type=file_type, payload=file_content, source_id=SAMPLE_PDF, source_name=SAMPLE_PDF, extended_options={ "tracing_options": { "trace": True, "ts_send": time.time_ns(), } }, ) extract_task = ExtractTask( document_type=file_type, extract_text=True, extract_images=True, extract_tables=True, text_depth="document", extract_tables_method="yolox", ) job_spec.add_task(extract_task)
create_task
-
Description: Creates a task with specified parameters and adds it to an existing job.
-
Method:
create_task(job_id, task_type, task_params) -
Parameters:
job_id(uuid.UUID | str): The unique identifier of the job.task_type(TaskType): The type of the task.task_params(dict, optional): Parameters for the task.
-
Raises:
ValueError: If the job does not exist or if an attempt is made to modify a job after its submission.
-
Example:
job_id = client.add_job(job_spec) client.create_task(job_id, DedupTask, {content_type: "image", filter: True})
CLI Tool
After installation, you can use the nv-ingest-cli tool from the command line to manage and process datasets.
CLI Options
Here are the options provided by the CLI, explained:
--batch_size: Specifies the number of documents to process in a single batch. Default is 10. Must be 1 or more.--doc: Adds a new document to be processed. Supports multiple entries. Files must exist.--dataset: Specifies the path to a dataset definition file.--client: Sets the client type with choices including REST, Redis, Kafka. Default is Redis.--client_host: Specifies the DNS name or URL for the endpoint.--client_port: Sets the port number for the client endpoint.--client_kwargs: Provides additional arguments to pass to the client. Default is{}.--concurrency_n: Defines the number of inflight jobs to maintain at one time. Default is 1.--dry_run: Enables a dry run without executing actions.--output_directory: Specifies the output directory for results.--log_level: Sets the log level. Choices are DEBUG, INFO, WARNING, ERROR, CRITICAL. Default is INFO.--shuffle_dataset: Shuffles the dataset before processing if enabled. Default is true.--task: Allows for specification of tasks in JSON format. Supports multiple tasks.--collect_profiling_traces: Collect the tracing profile for the run after processing.--zipkin_host: Host used to connect to Zipkin to gather tracing profiles.--zipkin_port: Port used to connect to Zipkin to gether tracing profiles.
Examples
You can find a notebook with examples that use the CLI client in the client examples folder.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nv_ingest_client-2026.1.16.dev20260116.tar.gz.
File metadata
- Download URL: nv_ingest_client-2026.1.16.dev20260116.tar.gz
- Upload date:
- Size: 127.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ce04f44ca1736157e2be14c2deeda6c988f98607de1cf79a8d934bfd0df64444
|
|
| MD5 |
31881625ff45283888cfd507c53b99d8
|
|
| BLAKE2b-256 |
afdb55e33d3a9a646dc991b60f00e6f602d91fc880d8ca0c89024f299611edb9
|
File details
Details for the file nv_ingest_client-2026.1.16.dev20260116-py3-none-any.whl.
File metadata
- Download URL: nv_ingest_client-2026.1.16.dev20260116-py3-none-any.whl
- Upload date:
- Size: 147.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
598b5df090fcbdb7f86034076337422875f7171f84aad1cfb9c060fe01dfe586
|
|
| MD5 |
a7cfcb39e18544e7bbdcadfae0f46e7b
|
|
| BLAKE2b-256 |
2f478f896ea2fcbd5081c8568466b051c6a63df663e39db496b74f0d106ffd81
|