Skip to main content

OpenTelemetry Collector Python wrapper

Project description

OpenTelemetry Collector Python 集成示例

本示例展示如何将 OpenTelemetry Collector 二进制文件与 Python 包结合使用。

方案 1: 将二进制文件打包到 Python Wheel 中

目录结构

otel-collector-python/
├── setup.py
├── pyproject.toml
├── otel_collector/
│   ├── __init__.py
│   ├── collector.py
│   └── bin/
│       ├── otelcol-contrib_linux_amd64
│       ├── otelcol-contrib_darwin_arm64
│       └── otelcol-contrib_darwin_amd64
└── README.md

setup.py

from setuptools import setup, find_packages
import os
import platform

# 根据平台选择二进制文件
def get_binary_path():
    system = platform.system().lower()
    machine = platform.machine().lower()
    
    if system == 'linux' and machine in ['x86_64', 'amd64']:
        return 'bin/otelcol-contrib_linux_amd64'
    elif system == 'darwin' and machine == 'arm64':
        return 'bin/otelcol-contrib_darwin_arm64'
    elif system == 'darwin' and machine in ['x86_64', 'amd64']:
        return 'bin/otelcol-contrib_darwin_amd64'
    else:
        raise ValueError(f"Unsupported platform: {system} {machine}")

setup(
    name='otel-collector-python',
    version='0.1.0',
    packages=find_packages(),
    package_data={
        'otel_collector': [get_binary_path()],
    },
    include_package_data=True,
    install_requires=[
        'pyyaml>=6.0',
    ],
    entry_points={
        'console_scripts': [
            'otel-collector=otel_collector.cli:main',
        ],
    },
)

方案 2: 使用 subprocess 管理 Collector 进程

collector.py

import subprocess
import os
import signal
import time
import yaml
from pathlib import Path

class CollectorManager:
    def __init__(self, binary_path=None, config_path=None):
        self.binary_path = binary_path or self._find_binary()
        self.config_path = config_path
        self.process = None
        
    def _find_binary(self):
        """自动查找二进制文件"""
        package_dir = Path(__file__).parent
        bin_dir = package_dir / 'bin'
        
        system = platform.system().lower()
        machine = platform.machine().lower()
        
        if system == 'linux' and machine in ['x86_64', 'amd64']:
            binary = bin_dir / 'otelcol-contrib_linux_amd64'
        elif system == 'darwin' and machine == 'arm64':
            binary = bin_dir / 'otelcol-contrib_darwin_arm64'
        elif system == 'darwin' and machine in ['x86_64', 'amd64']:
            binary = bin_dir / 'otelcol-contrib_darwin_amd64'
        else:
            raise ValueError(f"Unsupported platform: {system} {machine}")
        
        if not binary.exists():
            raise FileNotFoundError(f"Collector binary not found: {binary}")
        
        return str(binary)
    
    def start(self, config=None):
        """启动 collector"""
        if self.process and self.process.poll() is None:
            raise RuntimeError("Collector is already running")
        
        config_path = config or self.config_path
        if not config_path:
            raise ValueError("Config path is required")
        
        cmd = [self.binary_path, '--config', config_path]
        self.process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            preexec_fn=os.setsid if os.name != 'nt' else None
        )
        
        return self.process
    
    def stop(self, timeout=10):
        """停止 collector"""
        if not self.process:
            return
        
        try:
            if os.name != 'nt':
                os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
            else:
                self.process.terminate()
            
            self.process.wait(timeout=timeout)
        except subprocess.TimeoutExpired:
            if os.name != 'nt':
                os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
            else:
                self.process.kill()
            self.process.wait()
        finally:
            self.process = None
    
    def is_running(self):
        """检查 collector 是否运行中"""
        if not self.process:
            return False
        return self.process.poll() is None
    
    def get_status(self):
        """获取 collector 状态"""
        if not self.process:
            return {'status': 'stopped'}
        
        returncode = self.process.poll()
        if returncode is None:
            return {'status': 'running', 'pid': self.process.pid}
        else:
            return {'status': 'stopped', 'returncode': returncode}

方案 3: 从 Python 应用发送数据到 Collector

使用 OpenTelemetry Python SDK

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource

# 配置 OTLP exporter 连接到本地 collector
resource = Resource.create({
    "service.name": "my-python-app",
    "service.version": "1.0.0",
})

trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)

# 连接到本地 collector (默认 localhost:4317)
otlp_exporter = OTLPSpanExporter(
    endpoint="http://localhost:4317",
    insecure=True,
)

span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# 使用 tracer
with tracer.start_as_current_span("my-operation") as span:
    span.set_attribute("key", "value")
    # 你的业务逻辑

方案 4: 完整的 Python 包示例

pyproject.toml

[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "otel-collector-python"
version = "0.1.0"
description = "OpenTelemetry Collector Python wrapper"
requires-python = ">=3.8"
dependencies = [
    "pyyaml>=6.0",
    "opentelemetry-api>=1.20.0",
    "opentelemetry-sdk>=1.20.0",
    "opentelemetry-exporter-otlp>=1.20.0",
]

[project.scripts]
otel-collector = "otel_collector.cli:main"

cli.py

import argparse
import sys
from .collector import CollectorManager

def main():
    parser = argparse.ArgumentParser(description='OpenTelemetry Collector Manager')
    parser.add_argument('command', choices=['start', 'stop', 'status'], help='Command to execute')
    parser.add_argument('--config', help='Path to collector config file')
    parser.add_argument('--binary', help='Path to collector binary')
    
    args = parser.parse_args()
    
    manager = CollectorManager(binary_path=args.binary, config_path=args.config)
    
    if args.command == 'start':
        if not args.config:
            print("Error: --config is required for start command", file=sys.stderr)
            sys.exit(1)
        manager.start(config=args.config)
        print("Collector started")
    elif args.command == 'stop':
        manager.stop()
        print("Collector stopped")
    elif args.command == 'status':
        status = manager.get_status()
        print(f"Status: {status['status']}")
        if 'pid' in status:
            print(f"PID: {status['pid']}")

if __name__ == '__main__':
    main()

使用示例

1. 安装包

pip install otel-collector-python

2. 启动 Collector

from otel_collector import CollectorManager

manager = CollectorManager(
    config_path='/path/to/config.yaml'
)
manager.start()

3. 从 Python 应用发送数据

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# 配置 exporter 连接到 collector
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))

# 使用
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("my-span"):
    # 你的代码
    pass

4. 命令行使用

# 启动 collector
otel-collector start --config /path/to/config.yaml

# 查看状态
otel-collector status

# 停止 collector
otel-collector stop

最佳实践

  1. 平台检测: 自动检测平台并选择对应的二进制文件
  2. 进程管理: 使用 subprocess 管理 collector 进程生命周期
  3. 配置管理: 支持从文件或代码中生成配置
  4. 错误处理: 处理进程启动失败、崩溃等情况
  5. 日志收集: 收集 collector 的 stdout/stderr 用于调试

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

otel_collector_python-0.1.0-py3-none-any.whl (11.6 MB view details)

Uploaded Python 3

File details

Details for the file otel_collector_python-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for otel_collector_python-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9f0578fc1bb00e1e3e6c28283d160a88a423066a9763e1c067d89413e726f14e
MD5 83ee73052ee691de755ec07a2aeaa2c7
BLAKE2b-256 5112c8dacf16666a49cae514294bcb8bbc7303b23b2e587fd0651a5caba4056b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page