Skip to main content

Python client for the nv-ingest service

Project description

NV-Ingest-Client

NV-Ingest-Client is a tool designed for efficient ingestion and processing of large datasets. It provides both a Python API and a command-line interface to cater to various ingestion needs.

[!Note] NV-Ingest is also known as NVIDIA Ingest and NeMo Retriever extraction.

Table of Contents

  1. Installation
  2. Usage
  3. Command Line Interface (CLI)
  4. Examples

Installation

To install NV-Ingest-Client, run the following command in your terminal:

pip install [REPO_ROOT]/client

This command installs both the API libraries and the nv-ingest-cli tool which can subsequently be called from the command line.

API Libraries

nv_ingest_client.primitives.jobs

JobSpec

Specification for creating a job for submission to the nv-ingest microservice.

  • Parameters:

    • payload (Dict): The payload data for the job.
    • tasks (Optional[List], optional): A list of tasks to be added to the job. Defaults to None.
    • source_id (Optional[str], optional): An identifier for the source of the job. Defaults to None.
    • source_name (Optional[str], optional): A name for the source of the job. Defaults to None.
    • document_type (Optional[str], optional): Type of the document. Defaults to 'txt'.
    • job_id (Optional[Union[UUID, str]], optional): A unique identifier for the job. Defaults to a new UUID.
    • extended_options (Optional[Dict], optional): Additional options for job processing. Defaults to None.
  • Attributes:

    • _payload (Dict): Storage for the payload data.
    • _tasks (List): Storage for the list of tasks.
    • _source_id (str): Storage for the source identifier.
    • _job_id (UUID): Storage for the job's unique identifier.
    • _extended_options (Dict): Storage for the additional options.
  • Methods:

    • to_dict() -> Dict:
      • Description: Converts the job specification to a dictionary for JSON serialization.
      • Returns: Dict: Dictionary representation of the job specification.
    • add_task(task):
      • Description: Adds a task to the job specification.
      • Parameters:
        • task: The task to be added. Assumes the task has a to_dict() method.
      • Raises:
        • ValueError: If the task does not have a to_dict() method or is not an instance of Task.
  • Properties:

    • payload: Getter/Setter for the payload data.
    • job_id: Getter/Setter for the job's unique identifier.
    • source_id: Getter/Setter for the source identifier.
    • source_name: Getter/Setter for the source name.
  • Example Usage:

    job_spec = JobSpec(
        payload={"data": "Example data"},
        tasks=[extract_task, split_task],
        source_id="12345",
        job_id="abcd-efgh-ijkl-mnop",
        extended_options={"tracing_options": {"trace": True}}
    )
    print(job_spec.to_dict())
    

nv_ingest_client.primitives.tasks

Task Factory

  • Function: task_factory(task_type, **kwargs)

    • Description: Factory method for creating task objects based on the provided task type. It dynamically selects the appropriate task class from a mapping and initializes it with any additional keyword arguments.
    • Parameters:
      • task_type (TaskType or str): The type of the task to create. Can be an enum member of TaskType or a string representing a valid task type.
      • **kwargs (dict): Additional keyword arguments to pass to the task's constructor.
    • Returns:
      • Task: An instance of the task corresponding to the given task type.
    • Raises:
      • ValueError: If an invalid task type is provided, or if any unexpected keyword arguments are passed that do not match the task constructor's parameters.
  • Example:

    # Assuming TaskType has 'Extract' and 'Split' as valid members and corresponding classes are defined.
    extract_task = task_factory('extract', document_type='PDF', extract_text=True)
    split_task = task_factory('split', split_by='sentence', split_length=100)
    

ExtractTask

Object for document extraction tasks, extending the Task class.

  • Method: __init__(document_type, extract_method='pdfium', extract_text=False, extract_images=False, extract_tables=False)

    • Parameters:
      • document_type: Type of document.
      • extract_method: Method used for extraction. Default is 'pdfium'.
      • extract_text: Boolean indicating if text should be extracted. Default is False.
      • extract_images: Boolean indicating if images should be extracted. Default is False.
      • extract_tables: Boolean indicating if tables should be extracted. Default is False.
      • extract_page_as_image: Boolean indicating if each page should be rendered as an image for embedding. Default is False.
    • Description: Sets up configuration for the extraction task.
  • Method: to_dict()

    • Description: Converts task details to a dictionary for submission to message client. Includes handling for specific methods and document types.
    • Returns: Dict: Dictionary containing task type and properties.
  • Example:

    extract_task = ExtractTask(
      document_type=file_type,
      extract_text=True,
      extract_images=True,
      extract_tables=True
    )
    

SplitTask

Object for document splitting tasks, extending the Task class.

  • Method: __init__(split_by=None, split_length=None, split_overlap=None, max_character_length=None, sentence_window_size=None)

    • Parameters:
      • split_by: Criterion for splitting, e.g., 'word', 'sentence', 'passage'.
      • split_length: The length of each split segment.
      • split_overlap: Overlap length between segments.
      • max_character_length: Maximum character length for a split.
      • sentence_window_size: Window size for sentence-based splits.
    • Description: Sets up configuration for the splitting task.
  • Method: to_dict()

    • Description: Converts task details to a dictionary for submission to message client.
    • Returns: Dict: Dictionary containing task type and properties.
  • Example:

    split_task = SplitTask(
        split_by="word",
        split_length=300,
        split_overlap=10,
        max_character_length=5000,
        sentence_window_size=0,
    )
    

nv_ingest_client.client.client

The NvIngestClient class provides a comprehensive suite of methods to handle job submission and retrieval processes efficiently. Below are the public methods available:

Initialization

  • __init__: Initializes the NvIngestClient with customizable client allocator and Redis configuration.

    • Parameters:
      • message_client_allocator: A callable that returns an instance of the client used for communication.
      • message_client_hostname: Hostname of the message client server. Defaults to "localhost".
      • message_client_port: Port number of the message client server. Defaults to 7670.
      • message_client_kwargs: Additional keyword arguments for the message client.
      • msg_counter_id: Redis key for tracking message counts. Defaults to "nv-ingest-message-id".
      • worker_pool_size: Number of worker processes in the pool. Defaults to 1.
  • Example:

    client = NvIngestClient(
      message_client_hostname="localhost", # Host where nv-ingest-ms-runtime is running
      message_client_port=7670 # REST port, defaults to 7670
    )
    

Submission Methods

submit_job

Submits a job to a specified job queue. This method can optionally wait for a response if blocking is set to True.

  • Parameters:

    • job_id: The unique identifier of the job to be submitted.
    • job_queue_id: The ID of the job queue where the job will be submitted.
  • Returns:

    • Optional[Dict]: The job result if blocking is True and a result is available before the timeout, otherwise None.
  • Raises:

    • Exception: If submitting the job fails.
  • Example:

    client.submit_job(job_id, "ingest_task_queue")
    

submit_jobs

Submits multiple jobs to a specified job queue. This method does not wait for any of the jobs to complete.

  • Parameters:

    • job_ids: A list of job IDs to be submitted.
    • job_queue_id: The ID of the job queue where the jobs will be submitted.
  • Returns:

    • List[Union[Dict, None]]: A list of job results if blocking is True and results are available before the timeout, otherwise None.
  • Example:

    client.submit_jobs([job_id0, job_id1], "ingest_task_queue")
    

submit_job_async

Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.

  • Parameters:

    • job_ids: A single job ID or a list of job IDs to be submitted.
    • job_queue_id: The ID of the job queue where the jobs will be submitted.
  • Returns:

    • Dict[Future, str]: A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
  • Notes:

    • This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
    • It does not wait for any of the jobs to complete.
    • Ensure that each job is in the proper state before submission.
  • Example:

    client.submit_job_async(job_id, "ingest_task_queue")
    

Job Retrieval

fetch_job_result

  • Description: Fetches the job result from a message client, handling potential errors and state changes.

  • Method: fetch_job_result(job_id, timeout=10, data_only=True)

  • Parameters:

    • job_id (str): The identifier of the job.
    • timeout (float, optional): Timeout for the fetch operation in seconds. Defaults to 10.
    • data_only (bool, optional): If true, only returns the data part of the job result.
  • Returns:

    • Tuple[Dict, str]: The job result and the job ID.
  • Raises:

    • ValueError: If there is an error in decoding the job result.
    • TimeoutError: If the fetch operation times out.
    • Exception: For all other unexpected issues.
  • Example:

    job_id = client.add_job(job_spec)
    client.submit_job(job_id, TASK_QUEUE)
    generated_metadata = client.fetch_job_result(
        job_id, timeout=DEFAULT_JOB_TIMEOUT
    )
    

fetch_job_result_async

  • Description: Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.

  • Method: fetch_job_result_async(job_ids, timeout=10, data_only=True)

  • Parameters:

    • job_ids (Union[str, List[str]]): A single job ID or a list of job IDs.
    • timeout (float, optional): Timeout for fetching each job result, in seconds. Defaults to 10.
    • data_only (bool, optional): Whether to return only the data part of the job result.
  • Returns:

    • Dict[Future, str]: A dictionary mapping each future to its corresponding job ID.
  • Raises:

    • No explicit exceptions raised but leverages the exceptions from fetch_job_result.
  • Example:

    job_id = client.add_job(job_spec)
    client.submit_job(job_id, TASK_QUEUE)
    generated_metadata = client.fetch_job_result_async(
        job_id, timeout=DEFAULT_JOB_TIMEOUT
    )
    

Job and Task Management

job_count

  • Description: Returns the number of jobs currently tracked by the client.

  • Method: job_count()

  • Returns: Integer representing the total number of jobs.

  • Example:

    client.job_count()
    

add_job

  • Description: Adds a job specification to the job tracking system.

  • Method: add_job(job_spec)

  • Parameters:

    • job_spec (JobSpec, optional): The job specification to add. If not provided, a new job ID will be generated.
  • Returns: String representing the job ID of the added job.

  • Raises:

    • ValueError: If a job with the specified job ID already exists.
  • Example:

    extract_task = ExtractTask(
      document_type=file_type,
      extract_text=True,
      extract_images=True,
      extract_tables=True,
      text_depth="document",
      extract_tables_method="yolox",
    )
    job_spec.add_task(extract_task)
    job_id = client.add_job(job_spec)
    

create_job

  • Description: Creates a new job with specified parameters and adds it to the job tracking dictionary.
  • Method: create_job(payload, source_id, source_name, document_type, tasks, job_id, extended_options)
  • Parameters:
    • payload (str): The payload associated with the job.
    • source_id (str): The source identifier for the job.
    • source_name (str): The unique name of the job's source data.
    • document_type (str, optional): The type of document to be processed.
    • tasks (list, optional): A list of tasks to be associated with the job.
    • job_id (uuid.UUID | str, optional): The unique identifier for the job.
    • extended_options (dict, optional): Additional options for job creation.
  • Returns: String representing the job ID.
  • Raises:
    • ValueError: If a job with the specified job ID already exists.

add_task

  • Description: Adds a task to an existing job.

  • Method: add_task(job_id, task)

  • Parameters:

    • job_id (str): The job ID to which the task will be added.
    • task (Task): The task to add.
  • Raises:

    • ValueError: If the job does not exist or is not in the correct state.
  • Example:

    job_spec = JobSpec(
        document_type=file_type,
        payload=file_content,
        source_id=SAMPLE_PDF,
        source_name=SAMPLE_PDF,
        extended_options={
            "tracing_options": {
                "trace": True,
                "ts_send": time.time_ns(),
            }
        },
    )
    extract_task = ExtractTask(
        document_type=file_type,
        extract_text=True,
        extract_images=True,
        extract_tables=True,
        text_depth="document",
        extract_tables_method="yolox",
    )
    job_spec.add_task(extract_task)
    

create_task

  • Description: Creates a task with specified parameters and adds it to an existing job.

  • Method: create_task(job_id, task_type, task_params)

  • Parameters:

    • job_id (uuid.UUID | str): The unique identifier of the job.
    • task_type (TaskType): The type of the task.
    • task_params (dict, optional): Parameters for the task.
  • Raises:

    • ValueError: If the job does not exist or if an attempt is made to modify a job after its submission.
  • Example:

    job_id = client.add_job(job_spec)
    client.create_task(job_id, DedupTask, {content_type: "image", filter: True})
    

CLI Tool

After installation, you can use the nv-ingest-cli tool from the command line to manage and process datasets.

CLI Options

Here are the options provided by the CLI, explained:

  • --batch_size: Specifies the number of documents to process in a single batch. Default is 10. Must be 1 or more.
  • --doc: Adds a new document to be processed. Supports multiple entries. Files must exist.
  • --dataset: Specifies the path to a dataset definition file.
  • --client: Sets the client type with choices including REST, Redis, Kafka. Default is Redis.
  • --client_host: Specifies the DNS name or URL for the endpoint.
  • --client_port: Sets the port number for the client endpoint.
  • --client_kwargs: Provides additional arguments to pass to the client. Default is {}.
  • --concurrency_n: Defines the number of inflight jobs to maintain at one time. Default is 1.
  • --dry_run: Enables a dry run without executing actions.
  • --output_directory: Specifies the output directory for results.
  • --log_level: Sets the log level. Choices are DEBUG, INFO, WARNING, ERROR, CRITICAL. Default is INFO.
  • --shuffle_dataset: Shuffles the dataset before processing if enabled. Default is true.
  • --task: Allows for specification of tasks in JSON format. Supports multiple tasks.
  • --collect_profiling_traces: Collect the tracing profile for the run after processing.
  • --zipkin_host: Host used to connect to Zipkin to gather tracing profiles.
  • --zipkin_port: Port used to connect to Zipkin to gether tracing profiles.

Examples

You can find a notebook with examples that use the CLI client in the client examples folder.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nv_ingest_client-2025.8.10.dev20250810.tar.gz (99.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

nv_ingest_client-2025.8.10.dev20250810-py3-none-any.whl (114.7 kB view details)

Uploaded Python 3

File details

Details for the file nv_ingest_client-2025.8.10.dev20250810.tar.gz.

File metadata

File hashes

Hashes for nv_ingest_client-2025.8.10.dev20250810.tar.gz
Algorithm Hash digest
SHA256 5ff8ebb49fffeb5cdc93abb9ca21c8ff397f3ab8f075b7ccfbf31fd0109b8351
MD5 61c59ce378db27097fba7ea230c75e80
BLAKE2b-256 0511642284dace9f9dac489694efecfc407acb6978f2cebfbe3e95e3bd5e0da1

See more details on using hashes here.

File details

Details for the file nv_ingest_client-2025.8.10.dev20250810-py3-none-any.whl.

File metadata

File hashes

Hashes for nv_ingest_client-2025.8.10.dev20250810-py3-none-any.whl
Algorithm Hash digest
SHA256 72b31caf33257c5d5a8313963a49da60689808bd26a4a46adcf7ec3ef0f5684e
MD5 fb664bad22bf3da15998de6470d53542
BLAKE2b-256 9dec8095b7c3cf3eb156e0ecbe6b984ec5677293a40787c1866959c903f2a3dc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page