Skip to main content

Kubernetes pod code executor (Robin): Python/shell execution and remote workspace upload for AutoGen-style agents.

Project description

KubernetesCommandLineCodeExecutor

Executor that runs code (Python / shell) inside a Kubernetes pod: each block is written as a file on the pod’s workspace volume and executed via exec (Kubernetes API + stream).

PyPI distribution name: kubernetes-robin-executor — install with:

pip install kubernetes-robin-executor

Import path (unchanged): autogen_kubernetes.code_executors.kubernetes_commandline_code_executor. The PyPI name differs from the upstream autogen-kubernetes package on purpose. Do not install both in the same environment if they expose the same top-level package layout; this distribution keeps the autogen_kubernetes module name for compatibility with existing code.

This file is the project readme in pyproject.toml, so it is what users see on the PyPI project page for kubernetes-robin-executor after you publish.


Cluster connection

Mode Behavior
remote_control_plane set (API URL, e.g. https://IP:6443) Bearer auth: pass kube_token= to the constructor or set KUBE_TOKEN in the environment. Client uses verify_ssl=False.
remote_control_plane=None Local kubeconfig or in-cluster config (load_kube_config / load_incluster_config).

Constructor (__init__)

Main parameters:

Parameter Description
image Container image (default python:3-slim).
pod_name Fixed pod name; if None, generates robin-code-exec-<uuid>.
namespace Namespace where the pod is created / listed.
timeout Seconds for exec-based runs (≥ 1).
work_dir Path inside the pod mounted as the workspace.
execution_policies Optional: override which languages are executed vs only saved.
remote_control_plane API server URL for bearer-token mode.
kube_token Optional bearer token when remote_control_plane is set. If omitted, KUBE_TOKEN from the environment is used. One of the two must be present in remote mode.
shared_pvc_name If set, the pod uses a shared PVC instead of emptyDir.
node Optional: nodeSelector kubernetes.io/hostname.
shared_resources If True and a Running pod already exists with the same image, reuses its name instead of creating a new pod.
env_vars List of {"key": "...", "value": "..."} appended to ~/.bashrc in the pod.

On construction, pod cleanup is registered with atexit and SIGINT is handled with cleanup().


Upload code from your machine and call functions in the pod

Besides execute_code_blocks, you can copy Python files into the pod’s work_dir and invoke callables by name without putting everything in a single markdown block.

Recommended flow

  1. install_pod_dispatcher(default_module="robin_user", runner_filename="executor.py") — Uploads an executor.py that reads JSON with function, arguments, and optionally module, imports the module, and calls function(**arguments).
  2. upload_file_content(relative_path, content) or upload_from_disk(local_path, remote_prefix="") — Copies your .py files or project tree into the pod under work_dir. upload_from_disk skips __pycache__ and hidden names (.*); text files only (UTF-8).
  3. execute_tool_function(arguments, func_name, *, module=None, runner_script="executor.py") — Uploads a temporary .json and runs python <work_dir>/<runner_script> <json>.

The interpreter adds the script’s directory to sys.path; with executor.py in work_dir, modules in the same directory (e.g. robin_user.py) or packages under work_dir can be imported using the right module (e.g. my_package.submodule).

Minimal example

with KubernetesCommandLineCodeExecutor(...) as executor:
    executor.install_pod_dispatcher(default_module="robin_user")
    executor.upload_from_disk("/local/path/my_folder")  # contains robin_user.py or your layout
    r = executor.execute_tool_function(
        {"name": "Ada"},
        "greet",
        module="robin_user",
    )
    assert r.exit_code == 0

If you omit module in execute_tool_function, the dispatcher uses the default_module passed to install_pod_dispatcher.

How execute_tool_function maps to Python on the pod

Call shape:

execute_tool_function(arguments_dict, "function_name", module="python.import.path")
Part Meaning
arguments_dict (first positional) Keyword arguments passed to the function: fn(**arguments_dict). Keys must match the parameter names of that function (e.g. {"name": "Ada"} for def greet(name: str).
func_name (second positional) Name of the callable inside the target module, as a string (e.g. "greet", "run_heavy_pipeline").
module= Python module path used in importlib.import_module(...), not the full file path on disk. For a single file uploaded as {work_dir}/robin_user.py, the module name is robin_user (same as the filename without .py). For a package layout {work_dir}/mypkg/helpers.py with mypkg as a package, use mypkg.helpers.

So for a file robin_user.py sitting in work_dir after upload_from_disk:

Minimal robin_user.py (upload this next to executor.py under work_dir, e.g. via upload_file_content("robin_user.py", ...) or upload_from_disk of a folder that contains it):

# robin_user.py — on the pod: {work_dir}/robin_user.py → module name "robin_user"

def greet(name: str) -> str:
    return f"Hello, {name}"


def run_heavy_pipeline(text: str, rounds: int = 1, max_lines: int = 500) -> dict:
    lines = text.splitlines()[:max_lines]
    return {"line_count": len(lines), "rounds": rounds}

Calling those functions from the client:

r1 = executor.execute_tool_function(
    {"name": "Ada"},   # arguments to greet(name=...)
    "greet",           # def greet(...) in robin_user.py
    module="robin_user",  # import robin_user  (file robin_user.py)
)
r2 = executor.execute_tool_function(
    {"text": "hello\n", "rounds": 3, "max_lines": 100},
    "run_heavy_pipeline",
    module="robin_user",
)

If the dict keys do not match the function’s parameters, the call on the pod will raise TypeError like a normal Python call.

Integration test in this repo

test_upload_from_disk_and_invoke_long_function uploads tests/fixtures/robin_upload_invoke/robin_user.py and calls greet and run_heavy_pipeline on the pod (marker robin_remote; see tests/README.md).

Run it from autogen-kubernetes/python:

uv run pytest packages/autogen-kubernetes/tests/test_kubernetes_commandline_executor.py::test_upload_from_disk_and_invoke_long_function -m robin_remote -v -s -n0

Public properties

Member Type / return Description
timeout int Configured timeout.
work_dir Path Working directory in the pod.
code_extractor MarkdownCodeExtractor Code-block extractor for agent integration.
execution_policies dict Per language: True runs, False only writes the file.
pod_name str Pod name in use (created or reused).
namespace str Current namespace.
image str Container image.

Public methods

Method Description
execute_code_blocks(code_blocks: list[CodeBlock]) -> CommandLineCodeResult Uploads and runs (or only saves) each block in order. Returns exit_code, output (combined stdout/stderr), and code_file.
upload_file_content(relative_path, content) -> str Writes text in the pod under work_dir / relative_path; returns the POSIX path in the pod.
upload_from_disk(local_path, *, remote_prefix="", encoding="utf-8") -> list[str] Uploads a local file or full directory tree; returns paths written in the pod.
install_pod_dispatcher(*, default_module="robin_user", runner_filename="executor.py") -> str Uploads the executor.py that dispatches JSON → function calls in the pod.
execute_tool_function(arguments, func_name, *, module=None, runner_script="executor.py") -> CommandLineCodeResult Uploads JSON and runs the runner (default executor.py in work_dir). argumentsfunc_name(**arguments) on the pod; moduleimportlib.import_module string (e.g. robin_user for robin_user.py in work_dir). Details: How execute_tool_function maps to Python on the pod (above).
cleanup() Deletes the pod in the namespace (404-safe).
stop() Runs the registered cleanup (delete pod).
restart() stop() then calls __init__ again with the main options (including remote_control_plane and kube_token when applicable).
add_env_vars_to_bashrc(env_vars) Appends exports to ~/.bashrc via exec.
is_pod_running_with_image() Lists pods in the namespace; if one is Running with self.image, returns its name (str); otherwise None.
get_pod_resource_usage() Tries to read metrics.k8s.io (often assumes local kubeconfig; may not work with remote token only).
create_resources() Creates the pod and waits for Running (usually already called from __init__).
handle_sigint(signum, frame) Signal handler: cleans up and exits the process.

Context manager: __enter__ / __exit__ → leaving with calls stop() (deletes the pod if cleanup is active).

Internals (reference): _upload_file_to_pod, _exec_command_with_exit_code, _wait_for_pod_ready — upload via stdin to exec, command wrapped in sh -c with exit-code capture.


Examples

1. Remote cluster with token (.env: KUBE_TOKEN; your app often also sets REMOTE_CONTROL_PLANE)

import os
from autogen_kubernetes.code_executors.base import CodeBlock
from autogen_kubernetes.code_executors.kubernetes_commandline_code_executor import (
    KubernetesCommandLineCodeExecutor,
)

os.environ.setdefault("KUBE_TOKEN", "...")  # or load_dotenv()

with KubernetesCommandLineCodeExecutor(
    image="williamgomez712/robin_executor_python:latest",
    namespace="my-namespace",
    work_dir="/tmp/robin_executor_test",
    timeout=120,
    remote_control_plane="https://API_SERVER:6443",
    shared_resources=False,
) as executor:
    result = executor.execute_code_blocks(
        [CodeBlock(language="python", code="print('hello')")]
    )
    print(result.exit_code, result.output)

2. Save file on the pod only (do not execute)

executor.execution_policies["python"] = False
result = executor.execute_code_blocks(
    [CodeBlock(language="python", code="print('does not run')")]
)
# result.output will include "Code saved to ..."

3. Shell in the pod

result = executor.execute_code_blocks(
    [CodeBlock(language="sh", code="echo ok && uname -a")]
)

4. Tooling via uploaded dispatcher (install_pod_dispatcher) or executor.py baked into the image

executor.install_pod_dispatcher(default_module="robin_user")
executor.upload_file_content("robin_user.py", open("robin_user.py").read())
result = executor.execute_tool_function(
    {"query": "test"},
    func_name="my_function",
    module="robin_user",
)

If the image already provides executor.py under work_dir, you can call execute_tool_function alone without install_pod_dispatcher.

5. Local cluster / kubeconfig (no remote_control_plane)

with KubernetesCommandLineCodeExecutor(
    namespace="default",
    work_dir="/tmp/work",
) as executor:
    ...

(Requires a valid ~/.kube/config or in-cluster execution.)


CommandLineCodeResult

Field Meaning
exit_code Exit code of the last relevant command in the sequence.
output Accumulated text (stdout/stderr from exec).
code_file Path of the first generated code file in the pod (if any).

Practical notes

  • shared_resources=True: reuses a pod with the same image; the existing pod’s work_dir must match what your code expects, or you may see “file not found” errors.
  • Filename hints in code: the first line can set a path with comments like # filename: foo.py (and HTML/JS patterns); otherwise a tmp_code_<md5>.<lang> under work_dir is used.
  • RBAC: the token or kubeconfig user needs pods + pods/exec (and create/delete depending on your flow).
  • After stop() / exiting with, the pod is usually deleted; do not expect it to remain for debugging unless you skip cleanup or use another workflow.

Publishing to PyPI

  • [project] name is kubernetes-robin-executor. readme points at this file, so PyPI’s long description matches this document.

  • Manifest: autogen-kubernetes/python/packages/autogen-kubernetes/pyproject.toml (relative to the autogen-kubernetes clone).

  • Consumers (same as local install from PyPI):

pip install kubernetes-robin-executor
  • Automation: from autogen-kubernetes/python, run python scripts/publish_kubernetes_robin_executor.py <version> (optional --testpypi, --dry-run). It bumps version in pyproject.toml, runs uv lock, rebuilds dist/, then prompts for an API token and calls twine upload.

  • Step-by-step build/upload (tokens, TestPyPI, twine) is in tests/PYPI-publish.local.md next to the test suite. That file may be gitignored in your outer repo; tests/README.md mentions it.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kubernetes_robin_executor-1.1.1.tar.gz (57.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kubernetes_robin_executor-1.1.1-py3-none-any.whl (55.0 kB view details)

Uploaded Python 3

File details

Details for the file kubernetes_robin_executor-1.1.1.tar.gz.

File metadata

File hashes

Hashes for kubernetes_robin_executor-1.1.1.tar.gz
Algorithm Hash digest
SHA256 76e1d08987e620adae42397edefd73f4e32a32d19781cf27e167a859c70b0f0d
MD5 48da1b8f37f54664747c82c3880c2305
BLAKE2b-256 0d4d3efb583efcfffb4880c14393f45d09e97e68a3f31a15fc3f79375abbe5b4

See more details on using hashes here.

File details

Details for the file kubernetes_robin_executor-1.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for kubernetes_robin_executor-1.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ec8c61f76c01e180045e14732a31c36be48f01a61b48949181d6f57ba0155988
MD5 a0bcdd77fc0939ee3edb2a8aaa9492bf
BLAKE2b-256 d0726e3ac6fc27f1ab10c5e111e3be510dd85fb788d9a48772858141febc7758

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page