OpenTelemetry Collector Python wrapper
Project description
OpenTelemetry Collector Python 集成示例
本示例展示如何将 OpenTelemetry Collector 二进制文件与 Python 包结合使用。
方案 1: 将二进制文件打包到 Python Wheel 中
目录结构
otel-collector-python/
├── setup.py
├── pyproject.toml
├── otel_collector/
│ ├── __init__.py
│ ├── collector.py
│ └── bin/
│ ├── otelcol-contrib_linux_amd64
│ ├── otelcol-contrib_darwin_arm64
│ └── otelcol-contrib_darwin_amd64
└── README.md
setup.py
from setuptools import setup, find_packages
import os
import platform
# 根据平台选择二进制文件
def get_binary_path():
system = platform.system().lower()
machine = platform.machine().lower()
if system == 'linux' and machine in ['x86_64', 'amd64']:
return 'bin/otelcol-contrib_linux_amd64'
elif system == 'darwin' and machine == 'arm64':
return 'bin/otelcol-contrib_darwin_arm64'
elif system == 'darwin' and machine in ['x86_64', 'amd64']:
return 'bin/otelcol-contrib_darwin_amd64'
else:
raise ValueError(f"Unsupported platform: {system} {machine}")
setup(
name='otel-collector-python',
version='0.1.0',
packages=find_packages(),
package_data={
'otel_collector': [get_binary_path()],
},
include_package_data=True,
install_requires=[
'pyyaml>=6.0',
],
entry_points={
'console_scripts': [
'otel-collector=otel_collector.cli:main',
],
},
)
方案 2: 使用 subprocess 管理 Collector 进程
collector.py
import subprocess
import os
import signal
import time
import yaml
from pathlib import Path
class CollectorManager:
def __init__(self, binary_path=None, config_path=None):
self.binary_path = binary_path or self._find_binary()
self.config_path = config_path
self.process = None
def _find_binary(self):
"""自动查找二进制文件"""
package_dir = Path(__file__).parent
bin_dir = package_dir / 'bin'
system = platform.system().lower()
machine = platform.machine().lower()
if system == 'linux' and machine in ['x86_64', 'amd64']:
binary = bin_dir / 'otelcol-contrib_linux_amd64'
elif system == 'darwin' and machine == 'arm64':
binary = bin_dir / 'otelcol-contrib_darwin_arm64'
elif system == 'darwin' and machine in ['x86_64', 'amd64']:
binary = bin_dir / 'otelcol-contrib_darwin_amd64'
else:
raise ValueError(f"Unsupported platform: {system} {machine}")
if not binary.exists():
raise FileNotFoundError(f"Collector binary not found: {binary}")
return str(binary)
def start(self, config=None):
"""启动 collector"""
if self.process and self.process.poll() is None:
raise RuntimeError("Collector is already running")
config_path = config or self.config_path
if not config_path:
raise ValueError("Config path is required")
cmd = [self.binary_path, '--config', config_path]
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid if os.name != 'nt' else None
)
return self.process
def stop(self, timeout=10):
"""停止 collector"""
if not self.process:
return
try:
if os.name != 'nt':
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
else:
self.process.terminate()
self.process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
if os.name != 'nt':
os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
else:
self.process.kill()
self.process.wait()
finally:
self.process = None
def is_running(self):
"""检查 collector 是否运行中"""
if not self.process:
return False
return self.process.poll() is None
def get_status(self):
"""获取 collector 状态"""
if not self.process:
return {'status': 'stopped'}
returncode = self.process.poll()
if returncode is None:
return {'status': 'running', 'pid': self.process.pid}
else:
return {'status': 'stopped', 'returncode': returncode}
方案 3: 从 Python 应用发送数据到 Collector
使用 OpenTelemetry Python SDK
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
# 配置 OTLP exporter 连接到本地 collector
resource = Resource.create({
"service.name": "my-python-app",
"service.version": "1.0.0",
})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)
# 连接到本地 collector (默认 localhost:4317)
otlp_exporter = OTLPSpanExporter(
endpoint="http://localhost:4317",
insecure=True,
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 使用 tracer
with tracer.start_as_current_span("my-operation") as span:
span.set_attribute("key", "value")
# 你的业务逻辑
方案 4: 完整的 Python 包示例
pyproject.toml
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "otel-collector-python"
version = "0.1.0"
description = "OpenTelemetry Collector Python wrapper"
requires-python = ">=3.8"
dependencies = [
"pyyaml>=6.0",
"opentelemetry-api>=1.20.0",
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp>=1.20.0",
]
[project.scripts]
otel-collector = "otel_collector.cli:main"
cli.py
import argparse
import sys
from .collector import CollectorManager
def main():
parser = argparse.ArgumentParser(description='OpenTelemetry Collector Manager')
parser.add_argument('command', choices=['start', 'stop', 'status'], help='Command to execute')
parser.add_argument('--config', help='Path to collector config file')
parser.add_argument('--binary', help='Path to collector binary')
args = parser.parse_args()
manager = CollectorManager(binary_path=args.binary, config_path=args.config)
if args.command == 'start':
if not args.config:
print("Error: --config is required for start command", file=sys.stderr)
sys.exit(1)
manager.start(config=args.config)
print("Collector started")
elif args.command == 'stop':
manager.stop()
print("Collector stopped")
elif args.command == 'status':
status = manager.get_status()
print(f"Status: {status['status']}")
if 'pid' in status:
print(f"PID: {status['pid']}")
if __name__ == '__main__':
main()
使用示例
1. 安装包
pip install otel-collector-python
2. 启动 Collector
from otel_collector import CollectorManager
manager = CollectorManager(
config_path='/path/to/config.yaml'
)
manager.start()
3. 从 Python 应用发送数据
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# 配置 exporter 连接到 collector
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
trace.set_tracer_provider(TracerProvider())
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
# 使用
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("my-span"):
# 你的代码
pass
4. 命令行使用
# 启动 collector
otel-collector start --config /path/to/config.yaml
# 查看状态
otel-collector status
# 停止 collector
otel-collector stop
最佳实践
- 平台检测: 自动检测平台并选择对应的二进制文件
- 进程管理: 使用 subprocess 管理 collector 进程生命周期
- 配置管理: 支持从文件或代码中生成配置
- 错误处理: 处理进程启动失败、崩溃等情况
- 日志收集: 收集 collector 的 stdout/stderr 用于调试
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file otel_collector_python-0.1.0-py3-none-any.whl.
File metadata
- Download URL: otel_collector_python-0.1.0-py3-none-any.whl
- Upload date:
- Size: 11.6 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9f0578fc1bb00e1e3e6c28283d160a88a423066a9763e1c067d89413e726f14e
|
|
| MD5 |
83ee73052ee691de755ec07a2aeaa2c7
|
|
| BLAKE2b-256 |
5112c8dacf16666a49cae514294bcb8bbc7303b23b2e587fd0651a5caba4056b
|