Skip to main content

Private utility package

Project description

Table of contents

1. Installation

Installing from PyPi

The py3-utilities package requires Python 3.12 or newer.

⚠️ The base package does not include any functionality by default. All modules are included as optional dependencies.

Installing with pip

To install with the desired functionality, use one or more of the available extras:

# Logging utilities
pip install py3-utilities[logger]

# Configuration file support
pip install py3-utilities[config]

# Gitlab client
pip install py3-utilities[gitlab]

# Excel client
pip install py3-utilities[excel]

# OS-level utilities
pip install py3-utilities[os-windows]
pip install py3-utilities[os-crossplatform]

# AI integrations (OpenAI, Azure)
pip install py3-utilities[ai]

# Jira-related functionality
pip install py3-utilities[jira]

# Teams-related functionality
pip install py3-utilities[teams]

# Install everything
pip install py3-utilities[all]

Installing with Poetry

You can also include py3-utilities in your pyproject.toml file:

[tool.poetry.dependencies]
python = ">=3.12,<4.0"
py3-utilities = { version = "^1.2.0", extras = ["logger", "config"] }

Just list the extras you need under the extras key.

To add it via CLI:

poetry add py3-utilities -E logger -E config

Installing from internal GitLab (PyPI registry)

This project is published to the GitLab PyPI Package Registry on tagged releases.

pip

Configure pip to use the project registry (replace placeholders):

pip install \
  --index-url "https://__token__:<TOKEN>@gitlab.<company>/api/v4/projects/<PROJECT_ID>/packages/pypi/simple" \
  "py3-utilities[config,logger]"

Poetry

[[tool.poetry.source]]
name = "gitlab"
url = "https://gitlab.<company>/api/v4/projects/<PROJECT_ID>/packages/pypi/simple"
priority = "primary"

[tool.poetry.dependencies]
py3-utilities = { version = "^1.2.0", source = "gitlab", extras = ["config", "logger"] }

2. Config File handling

2.1 Config Parser

The configuration parser system is powered by a Config singleton class that loads environment variables from .env files and merges all matching configuration files (*.yaml, *.yml, *.toml, *.json, *.ini, *.xml) found in the current directory and common config folders (config, configuration, cfg, conf, env, environment). These files are combined into a single config tree and converted into nested SimpleNamespace objects for easy dot-access in Python.

Optionally, you can restrict which config files are merged by providing an allowlist. Allowlist entries are shell-style wildcard patterns (fnmatch) that match either the filename or the (POSIX) path (e.g. ['app.yaml', 'secrets*.toml', 'config/*.json']).

Supported File Types

  • .env: Environment variables
  • .yaml, .yml: YAML config files
  • .toml: TOML config files
  • .json: JSON config files
  • .ini: INI config files
  • .xml: XML config files (attributes and text content are parsed into dicts)

XML Parsing Notes

  • Attributes are parsed as normal keys (no @ prefix).
  • Text content inside tags is placed under a "value" key (unless the element only has text).
  • Nested elements and repeated tags are handled as nested dicts or lists.

Example Config Files

  • TOML configuration:
[application]
download_folder = "tmp"
output_folder   = "output"
  • .env file:
API_TOKEN = "abcd..."
  • YAML file:
logging:
  root_folder: "logs"
  • XML file:
<database host="localhost" port="5432">mydb</database>

Loading Configuration

To parse and access configuration values:

from utilities import parse_config

# Regular loading from current directory and common config folders
config = parse_config()

# Config files can also be loaded from custom directories
config = parse_config(config_paths=["dir1", "path/to/dir2"])

# Restrict merging to specific config files (filename or path patterns)
config = parse_config(allowlist=["app.yaml", "secrets*.toml", "config/*.json"])

# TOML part
config.application.download_folder
config.application.output_folder

# YAML part
config.logging.root_folder

# XML part
config.database.host       # "localhost"
config.database.port       # "5432"
config.database.value      # "mydb"

# .env file (keys are lowercased in the config namespace)
config.os.env.api_token

# Other environment variables (also lowercased)
config.os.env.path

2.2 Config Writer

The utilities module also provides a write_config function to serialize and export configuration data into different formats (json, yaml, toml, ini, or xml). This is useful for saving updated configuration states, debugging, or exporting subsets of the config for other applications.

Writing Configuration

You can write a parsed or custom config structure to a file as follows:

from utilities.config_writer import write_config, ConfigFormat

# Example: Save current config to YAML, excluding 'os' key
write_config(config, "output_config.yaml", format=ConfigFormat.YAML, exclude_keys=["os"])

Supported Formats

  • ConfigFormat.JSON
  • ConfigFormat.YAML
  • ConfigFormat.TOML
  • ConfigFormat.INI
  • ConfigFormat.XML

Excluding Keys

Use the exclude_keys parameter to omit keys (applied recursively) when writing the config:

write_config(config, "config.json", format=ConfigFormat.JSON, exclude_keys=["os"])

2.3 Automatic Logger Configuration

A properly formatted part of the configuration file automatically controls the behavior of your logging system. Each logger can be customized individually for console, file, and JSON outputs, including rotation settings and timestamp formats.

The logger configuration is read automatically by the Config singleton loader when the utilities module is imported. If the configuration includes a [logging.loggers.<name>.<optiona_sub_name>] section, a logger will be automatically created and exposed via the log namespace.

For example, a logger defined as:

[logging.loggers.app.logger]

Will be accessible via:

log.app.logger.info("...")

These loggers can also be used as function decorators:

@log.app.logger
def my_function():
    ...

This will automatically log function entry, exit, and optionally exceptions (depending on decorator settings).


Root Settings ([logging])

Field Type Description Default
root_folder string Directory where log files are stored logs
cleanup_old_logs bool Enable automatic deletion of old logs true
cleanup_days int Number of days to keep log files 7

Logger Section ([logging.loggers.<logger_name>.<optional_sub_name>])

Each logger is defined under this section.

Common Fields

Field Type Description Default
enabled bool Whether the logger is active false
clear_handlers bool Remove existing handlers false

Console Output

Field Type Description Default
console_output bool Enable/disable console logging false
console_log_level string Log level (DEBUG, INFO, etc.) "INFO"
console_timestamp_format string Format for timestamps (strftime format) optional

File Output

Field Type Description Default
file_output bool Enable/disable plain text file logging false
file_log_level string Log level "INFO"
file_rotation_time bool Enable time-based rotation false
file_rotation_when string Time unit for rotation (e.g. midnight) "midnight"
file_rotation_interval int Number of units between rotations 1
file_rotation_backup_count int Number of backups to retain 7
file_timestamp_format string Format for timestamps optional

JSON Output

Field Type Description Default
json_output bool Enable/disable JSON file logging false
json_log_level string Log level "INFO"
json_rotation_time bool Enable time-based rotation false
json_rotation_when string Time unit for rotation "midnight"
json_rotation_interval int Number of units between rotations 1
json_rotation_backup_count int Number of backups to retain 7
json_timestamp_format string Format for timestamps optional

Decorator

Field Type Description Default
decorator_raise_exception bool Enable/disable JSON file logging false
decorator_log_level string Log level "DEBUG"
decorator_max_log_length int Enable time-based rotation 500
decorator_log_arguments bool Time unit for rotation true
decorator_tag string Number of units between rotations "decorator"
decorator_warn_duration int Number of backups to retain optional
decorator_log_stack bool Whether to log call stack false
decorator_log_return_value bool Whether to log function return values false
decorator_log_execution_time bool Whether to log function execution times false
decorator_sensitive_params list Format for timestamps optional

Example Configuration

# --- Logger creation ---

[logging]
root_folder = "logs"
cleanup_old_logs = true
cleanup_days = 7

# Create a logger for Jira related operations
[logging.loggers.jira.logger]
enabled = true
clear_handlers = false

# Console
console_output = true
console_log_level = "INFO"
console_timestamp_format = "%Y-%m-%d %H:%M:%S"

# File
file_output = true
file_log_level = "INFO"
file_rotation_time = true
file_rotation_when = "midnight"
file_rotation_interval = 1
file_rotation_backup_count = 7
file_timestamp_format = "%Y-%m-%dT%H:%M:%S"

# JSON
json_output = true
json_log_level = "INFO"
json_rotation_time = true
json_rotation_when = "midnight"
json_rotation_interval = 1
json_rotation_backup_count = 7
json_timestamp_format = "%Y-%m-%dT%H:%M:%S"

# Decorator options
decorator_raise_exception = true
decorator_log_level = "DEBUG"
decorator_max_log_length = 100
decorator_log_arguments = false
decorator_tag = "decorator"
decorator_warn_duration = 5
decorator_log_stack = false
decorator_log_return_value = false
decorator_log_execution_time = true
decorator_sensitive_params = ["api_token"]

# Create another for other stuff
[logging.loggers.other]
enabled = true
file_output = true
json_output = false
console_output = false

# --- Application ---

[application]
# Write your other configurations here ...

These will be accessible via:

from utilities import log

# ...

log.jira.logger.info("...")
log.other.info("...")

Notes

  • If file_output, json_output and console_output are false, the logger will fall back to a NullHandler.
  • Rotation is only time-based; size-based rotation is not currently supported from config.
  • Timestamp format strings follow Python's strftime syntax.
  • Loggers are exposed via log.<logger_name>, e.g. log.jira.logger if the config key is [logging.loggers.jira.logger].
  • These loggers can be used as decorators to automatically log function calls.

2.4 Behind the Scenes (Explicit Initialization)

The utilities.logger package is designed to minimize boilerplate and automate logger setup using configuration-driven initialization.

Important: loggers are not created automatically on import anymore. You explicitly initialize them by calling init_logging().


Config Parser (config/__init__.py)

  • parse_config() initializes the Config singleton.
  • It loads environment variables (including a .env file if present), and all .yaml, .toml, .ini, .xml and .json files from the project library.
  • All configs are merged into a nested SimpleNamespace, accessible via dot notation:
from utilities import parse_config

config = parse_config()
...

Logger Module (logger/__init__.py)

When you call init_logging():

  • It imports and uses parse_config() to load configuration.

  • It reads config.logging.loggers.* entries.

  • For each enabled logger:

    • A Logger instance is created.
    • A matching LogDecorator is configured.
    • These are bundled into a LogWrapper, exposing both logging and decorating in one object.
    • A log.<name> namespace is dynamically built to match the config structure.

If configuration cannot be loaded or no logging section exists, init_logging() returns an empty registry and emits a RuntimeWarning.

Example config:

[logging.loggers.analytics]
enabled = true

Enables:

from utilities.logger import init_logging

log = init_logging()

log.analytics.info("...logged!")

@log.analytics
def do_something():
    ...

3. Other utilities

The interfaces used by the initialization part are also exposed and can be used manually.

3.1 Logger utilities

The logger utility module provides structured, flexible, and context-aware logging. It supports console, plain-text, and JSON output, asynchronous logging via queue listeners, daily log foldering, log rotation (size/time-based), and cleanup/compression utilities.

The main components are:

  • Logger: Core configuration and setup utility for logging
  • LogDecorator: Decorator for function-level logging (arguments, return values, exceptions)
  • LogWrapper: A convenience wrapper combining the logger and decorator

Logger class

A full-featured wrapper around Python’s logging module.

Features

  • Output to console, plain-text, and/or JSON log files
  • Rotation per output type (size-based or time-based)
  • Daily output folders when file and/or JSON logging is enabled: base_log_dir/<YYYY-MM-DD>/...
  • Context field injection into JSON logs via contextvars
  • Optional asynchronous logging via QueueHandler + QueueListener
  • Cleanup/compression of old daily log folders

Important notes

  • For each output type (file/JSON), size-based and time-based rotation are mutually exclusive.
  • The underlying logger is created via logging.getLogger(name) and is set to DEBUG (handlers control what is emitted).

Usage

from utilities import Logger

logger = Logger(
    name="my_app",                       # REQUIRED: name used for logger and file naming
    base_log_dir="logs",                 # OPTIONAL: base directory for daily log folders (default: 'logs')
    clear_handlers=True,                  # OPTIONAL: remove existing handlers on the same named logger

    file_output=True,                     # OPTIONAL: enable plain-text file logging
    json_output=True,                     # OPTIONAL: enable JSON file logging
    console_output=True,                  # OPTIONAL: enable console logging

    file_rotation_time_based=True,        # OPTIONAL: rotate file logs by time (e.g., daily)
    json_rotation_size_based=True,        # OPTIONAL: rotate JSON logs by size

    async_queue_size=10000                # OPTIONAL: initial queue size (enable_async_logging() uses unbounded)
)

log = logger.get_logger()
log.info("App started")

JSON context fields

Add extra fields to JSON logs (useful for job/run identifiers). These are injected by the default JSON formatter.

from utilities import Logger

L = Logger(name="my_app", json_output=True)

with L.context_scope(job_id="42", user="alice"):
    L.get_logger().info("processing")

# Or set it globally (overwrites the whole context dict):
L.set_context(job_id="99")
L.get_logger().info("next")
L.clear_context()

Asynchronous logging

from utilities import Logger

L = Logger(name="my_app", console_output=True)
L.enable_async_logging()
L.get_logger().warning("async message")
L.shutdown_async_logging()

Logger.cleanup_old_logs(...) (static method)

Deletes daily log folders older than the specified number of days.

  • If name == "*", it scans all immediate subdirectories under base_log_dir as logger directories.
  • Otherwise it only scans base_log_dir/<name>/ if that directory exists.

Usage

from utilities import Logger

Logger.cleanup_old_logs(
    base_log_dir="logs",       # REQUIRED
    name="my_app",             # REQUIRED: logger directory name or "*" for all
    days=5,                     # OPTIONAL: delete folders older than this many days (default: 7)
    verbose=True                # OPTIONAL: log deletions/errors to internal logger (default: False)
)

Logger.compress_old_logs(...) (static method)

Archives daily log folders older than the specified number of days into a gzip tar archive (.tar.gz).

  • Uses the same folder selection rules as cleanup_old_logs().
  • Copies folders to a temporary location before creating the archive; originals are not deleted.

Usage

from utilities import Logger

archive_path = Logger.compress_old_logs(
    base_log_dir="logs",           # REQUIRED
    name="my_app",                 # REQUIRED: logger directory name or "*" for all
    days=10,                        # OPTIONAL: archive folders older than this (default: 7)
    archive_name="old_logs",       # OPTIONAL: archive base n

LogDecorator class

A decorator to wrap any function (sync or async) and automatically log:

  • Execution time
  • Arguments (with optional masking)
  • Return value
  • Stack trace on error
  • Performance warnings

Usage

from utilities import LogDecorator

decorator = LogDecorator(
    logger=logger.get_logger(),         # REQUIRED: logger instance
    raise_exception=False,             # OPTIONAL: Don't re-raise after logging
    log_arguments=True,                # OPTIONAL: Log function arguments
    sensitive_params=["password"],     # OPTIONAL: Redact sensitive argument names
    log_return=True,                   # OPTIONAL: Log return value
    log_execution_time=True,           # OPTIONAL: Log duration
    warn_duration=1.5                  # OPTIONAL: Warn if duration > X sec
)

@decorator
def process_user(username, password):
    ...

LogWrapper class

A helper that bundles both the Logger and LogDecorator. It acts as a unified interface to:

  • Decorate functions
  • Call logging methods (info, error, etc.)
  • Access context methods (context_scope, set_context, etc.)

Usage

from utilities import LogWrapper

log = LogWrapper(decorator, logger)

@log
def example():
    log.info("Inside decorated function")
    ...

3.2 Jira Utilities

The Jira utility module provides an asynchronous interface for interacting with Jira's REST API (v2). It abstracts common Jira operations like issue lookup, comment reading, changelog filtering, and sprint/board querying, making it easier to incorporate Jira functionality into your Python workflows.

Classes and Functions

Class JiraClient

This class handles all asynchronous interactions with the Jira server using aiohttp. It supports configurable logging, retries, and selective data fetching (e.g., comments, worklogs, changelogs).

Usage:

from utilities import JiraClient

client = JiraClient(
    api_key="your-api-token",                    # Mandatory: API token for Jira REST authentication
    jira_url="https://your-domain.atlassian.net",# Mandatory: Base Jira instance URL
    retry_cnt=3,                                   # Optional: Number of retry attempts (default: 1)
    verbose=True,                                  # Optional: Enables detailed logs (default: False)
    log_urls=True,                                 # Optional: Logs every requested URL (default: False)
    logger=None,                                   # Optional: Custom logger instance
    log_level=None                                 # Optional: Logging level, default is INFO
)

await client.start_session()

Initializes the aiohttp session. This must be called before making any API requests.


await client.close_session()

Closes the aiohttp session to clean up resources.


await client.read_issue(...)

Fetches a Jira issue with optional comments, worklogs, and filtered changelog.

issue = await client.read_issue(
    issue_id="PROJ-123",                     # Mandatory: Jira issue ID
    read_changelog=True,                      # Optional: Includes changelog (default: False)
    read_comments=True,                       # Optional: Includes comments (default: False)
    read_worklog=True,                        # Optional: Includes worklog (default: False)
    changelog_filter=["status", "assignee"] # Optional: List of changelog fields to include
)

await client.read_linked_issue_keys(issue_key: str)

Returns a lists of issue keys created under a Jira task by links.

keys = await client.read_linked_issue_keys(
    issue_key="ISSUE-123"  # Mandatory: The issue key
)

await client.read_linked_epic_keys(epic_key: str)

Returns a lists of issue keys created under a Jira epic by links.

keys = await client.read_linked_epic_keys(
    epic_key="EPIC-123"  # Mandatory: The epic key
)

await client.read_custom_jql_keys(custom_jql: str)

Returns a list of issue keys matching a JQL query.

keys = await client.read_custom_jql_keys(
    custom_jql="project = PROJ AND status = 'To Do'"  # Mandatory: Your JQL query
)

await client.read_board_id(board_name: str)

Looks up the board ID from its name.

board_id = await client.read_board_id(
    board_name="Development Board"  # Mandatory: Exact name of the Jira board
)

await client.read_sprint_list(...)

Returns active/closed sprints from a board, with optional filters.

sprints = await client.read_sprint_list(
    board_id=12,                       # Mandatory: Jira board ID
    origin_board=True,                 # Optional: Only return sprints from the original board (default: False)
    name_filter="Q1"                   # Optional: Filter sprints by name substring (default: None)
)

await client.send_request(...)

Examples on using this function:

Create an issue:

async def create_issue(self, summary, project_key, description):
    data = {
        "fields": {
            "project": {"key": project_key},
            "summary": summary,
            "description": description,
            "issuetype": {"name": "Task"}
        }
    }

    return await self.jira_client.send_request("POST", "/rest/api/2/issue", json=data)

Add a comment:

async def add_comment(self, issue_key, body):
    data = {"body": body}
    path = f"/rest/api/2/issue/{issue_key}/comment"
    return await self.jira_client.send_request("POST", path, json=data)

Delete a comment:

async def delete_comment(self, issue_key, comment_id):
    path = f"/rest/api/2/issue/{issue_key}/comment/{comment_id}"
    await self.jira_client.send_request("DELETE", path)

Update an issue:

async def update_issue(self, issue_key, update_fields):
    data = {"fields": update_fields}
    path = f"/rest/api/2/issue/{issue_key}"

    return await self.jira_client.send_request("PUT", path, json=data)

Class JiraWrapper

JiraWrapper is a convenience layer on top of JiraClient.

It automatically manages the session lifecycle (start_session() / close_session()), adds concurrency limiting for bulk issue fetches, and exposes higher-level helpers like:

  • Fetch a single issue (read_issue)
  • Fetch multiple issues (read_issues)
  • Fetch all issues by JQL (read_issues_jql)
  • Fetch only issue keys by JQL (read_keys_jql)
  • Fetch linked child issue keys (read_issue_children_keys, read_epic_children_keys)
  • Board/sprint utilities (read_board_id, read_sprint_list)
  • Pass-through raw requests (send_request)

Usage:

from utilities import JiraWrapper

wrapper = JiraWrapper(
    api_token="your-api-token",                   # Mandatory: Jira API token
    jira_url="https://your-domain.atlassian.net", # Mandatory: Base Jira instance URL
    max_concurrent_requests=50,                    # Optional: Concurrency limit (default: 50)
    max_retry_count=3,                             # Optional: Retry attempts per issue (default: 3)
    ssl_verify=True,                               # Optional: SSL verification (default: True)
    verbose=True                                   # Optional: Verbose logging
)

Fetch one issue:

issue = await wrapper.read_issue(
    "PROJ-123",
    read_comments=True,
    read_changelog=True,
    changelog_filter=["status", "assignee"],
)

Fetch issues by JQL:

issues = await wrapper.read_issues_jql(
    "project = PROJ AND updated >= -7d",
    read_worklog=True,
)

Fetch linked children keys:

children = await wrapper.read_issue_children_keys("PROJ-123")
# children is a dict: {"link_type": ["PROJ-456", "PROJ-789", ...], ...}

This module uses robust error handling, retry logic, and supports all core Jira querying needs in a simple asynchronous workflow.

3.3 GitLab Utilities

A client for interacting with a GitLab instance.
Capable of listing branches, tags, fetching commit info and changes, querying merge requests, cloning repositories, and downloading pipeline artifacts.

GitlabClient

Description:

The GitlabClient class is a utility for interacting with a GitLab instance, providing a unified interface for:

  • Listing and filtering branches, tags, and commits
  • Querying commit metadata with optional change magnitude statistics
  • Fetching merge request (MR) metadata including activity/notes
  • Cloning repositories and checking out branches or tags
  • Downloading and extracting pipeline artifacts
  • Retrieving file changes between commits or tags

Built as a wrapper around the python-gitlab and GitPython libraries, it simplifies common automation tasks.


Initialization

Constructor:

GitlabClient(
    url: str,
    private_token: str,
    verbose: bool = False,
    ssl_verify: bool = True,
    logger: Optional[Union[Logger, LogWrapper]] = None,
    log_level: Optional[int] = None,
    timeout: float = 50,
    retries: int = 3,
    large_repo_threshold: int = 500,
)

Parameters:

  • url: GitLab instance base URL (e.g., "https://gitlab.example.com")
  • private_token: Personal access token with sufficient permissions
  • verbose: Enable verbose logging (default: False)
  • ssl_verify: Verify SSL certificates during API calls (default: True)
  • logger: Optional logger instance for custom logging
  • log_level: Optional log level
  • timeout: API call timeout in seconds (default: 50)
  • retries: Number of retry attempts for transient API failures (default: 3)
  • large_repo_threshold: Warning threshold for large projects (default: 500)

Raises:

  • gitlab.exceptions.GitlabAuthenticationError: If authentication fails
  • gitlab.exceptions.GitlabConnectionError: If the server is unreachable
  • gitlab.exceptions.GitlabHttpError: For other HTTP/API errors during auth

Data Classes

CommitMetadata

@dataclass
class CommitMetadata:
    commit_short_id: str
    commit_title: str
    commit_author_name: str
    commit_id: str
    commit_date: str
    lines_added: Optional[int] = None
    lines_deleted: Optional[int] = None
    total_lines_changed: Optional[int] = None
    files_changed: Optional[int] = None
    changed_files: Optional[List[str]] = None
    directories_touched: Optional[List[str]] = None
    entropy: Optional[float] = None

MergeRequestMetadata

@dataclass
class MergeRequestMetadata:
    parent_branch_name: str
    source_branch_name: str
    merge_request_id: int
    merge_request_title: str
    merge_request_description: str
    merge_request_state: str
    merge_request_url: str
    branch_changed_files: Optional[List[str]] = None
    merge_request_activity: Optional[List[MergeRequestActivity]] = None

MergeRequestActivity

@dataclass
class MergeRequestActivity:
    id: int
    body: str
    type: Optional[str]
    author: str
    created_at: str

Core Methods

Project Operations

get_project(project_path: str) -> Optional[Project]

Fetch a GitLab project by path or numeric ID.

project = client.get_project("your-group/your-project")
# Or by project ID
project = client.get_project("12345")

Branch Operations

list_branches(project_path: str, max_branches: int = 100) -> list[str]

List branches in alphabetical order.

branches = client.list_branches("group/project", max_branches=20)

list_recent_branches(project_path: str, num_branches: int = 10) -> list[str]

List branches with the most recent commits.

recent_branches = client.list_recent_branches("group/project", num_branches=15)

get_branch_mr(project_path: str, branch_name: str, include_changed_files: bool = False, include_activity: bool = False) -> Optional[MergeRequestMetadata]

Get the parent merge request for a branch.

# Basic MR info
mr = client.get_branch_mr("group/project", "feature-branch")

# Include changed files and activity
mr_full = client.get_branch_mr(
    "group/project",
    "feature-branch",
    include_changed_files=True,
    include_activity=True
)

Tag Operations

list_tags(project_path: str) -> list[str]

List all tags in the repository.

tags = client.list_tags("group/project")

list_changed_files_between_tags(project_path: str, tag1: str, tag2: str) -> list[str]

List files changed between two tags.

changed_files = client.list_changed_files_between_tags(
    "group/project",
    "v1.0.0",
    "v2.0.0"
)

Commit Operations

list_commits(project_path: str, branch: str = "main", max_commits: int = 100, since: Optional[Union[str, datetime, date]] = None, until: Optional[Union[str, datetime, date]] = None, compute_change_magnitude: bool = False) -> list[CommitMetadata]

List commits on a branch with optional date filtering and change magnitude calculation.

# Basic commit list
commits = client.list_commits("group/project", branch="main", max_commits=50)

# With date filtering
from datetime import datetime, timedelta
since_date = datetime.now() - timedelta(days=30)
commits = client.list_commits(
    "group/project",
    branch="develop",
    since=since_date,
    max_commits=100
)

# With change magnitude stats
commits_detailed = client.list_commits(
    "group/project",
    branch="main",
    compute_change_magnitude=True,
    max_commits=20
)
for commit in commits_detailed:
    print(f"{commit.commit_short_id}: +{commit.lines_added} -{commit.lines_deleted}")

list_unique_commits_on_branch(project_path: str, base_branch: str, compare_branch: str) -> list[CommitMetadata]

List commits unique to a branch (not in base branch).

unique_commits = client.list_unique_commits_on_branch(
    "group/project",
    base_branch="main",
    compare_branch="feature/new-feature"
)

get_commit_changed_files(project_path: str, commit_id: str) -> list[str]

Get files changed in a specific commit.

changed_files = client.get_commit_changed_files(
    "group/project",
    commit_id="a1b2c3d4"
)

Repository Operations

clone_repository(project_url: str, local_path: str, branch: Optional[str] = None) -> Optional[Repo]

Clone a repository to a local path or open if already exists.

# Clone repository
repo = client.clone_repository(
    "group/project",
    "./local_repo",
    branch="main"
)

# Opens existing repository if path exists
repo = client.clone_repository("group/project", "./local_repo")

checkout_branch(local_path: str, branch: str) -> None

Check out a branch in an existing local repository.

client.checkout_branch("./local_repo", branch="feature-branch")

checkout_tag(local_path: str, tag: str, new_branch: str = None) -> None

Check out a tag, optionally creating a new branch.

# Detached HEAD at tag
client.checkout_tag("./local_repo", "v1.2.3")

# Create branch from tag
client.checkout_tag("./local_repo", "v1.2.3", new_branch="hotfix/v1.2.3")

Pipeline Artifacts

download_pipeline_artifacts(project_path: str, pipeline_id: int, jobs_of_interest: list[str], output_folder: str) -> None

Download and extract artifacts from specified jobs in a pipeline.

client.download_pipeline_artifacts(
    project_path="group/project",
    pipeline_id=123456,
    jobs_of_interest=["build", "test", "deploy"],
    output_folder="./artifacts"
)

Complete Usage Example

from utilities import GitlabClient
from datetime import datetime, timedelta
import os

# Initialize client
client = GitlabClient(
    url="https://gitlab.example.com",
    private_token=os.environ["GITLAB_TOKEN"],
    verbose=True,
    timeout=30,
    retries=3,
    large_repo_threshold=500
)

project_path = "your-group/your-project"

# 1. Get project
project = client.get_project(project_path)
print(f"Project: {project.name}")

# 2. List branches
branches = client.list_branches(project_path, max_branches=10)
recent_branches = client.list_recent_branches(project_path, num_branches=5)
print(f"Branches: {branches}")
print(f"Recent branches: {recent_branches}")

# 3. Get merge request info
mr = client.get_branch_mr(
    project_path,
    branch_name="feature-branch",
    include_changed_files=True,
    include_activity=True
)
if mr:
    print(f"MR #{mr.merge_request_id}: {mr.merge_request_title}")
    print(f"State: {mr.merge_request_state}")
    if mr.branch_changed_files:
        print(f"Changed files: {len(mr.branch_changed_files)}")
    if mr.merge_request_activity:
        print(f"Activities: {len(mr.merge_request_activity)}")

# 4. List tags and compare
tags = client.list_tags(project_path)
print(f"Tags: {tags[:5]}")  # First 5 tags

if len(tags) >= 2:
    changed_files = client.list_changed_files_between_tags(
        project_path,
        tags[-2],  # Second to last tag
        tags[-1]   # Last tag
    )
    print(f"Files changed between tags: {len(changed_files)}")

# 5. List commits with various filters
# Recent commits
commits = client.list_commits(
    project_path,
    branch="main",
    max_commits=20,
    compute_change_magnitude=True
)
for commit in commits[:5]:
    print(f"{commit.commit_short_id}: {commit.commit_title}")
    if commit.lines_added is not None:
        print(f"  Changes: +{commit.lines_added} -{commit.lines_deleted}")

# Commits in date range
since_date = datetime.now() - timedelta(days=7)
recent_commits = client.list_commits(
    project_path,
    branch="main",
    since=since_date,
    max_commits=50
)
print(f"Commits in last 7 days: {len(recent_commits)}")

# 6. Compare branches
unique_commits = client.list_unique_commits_on_branch(
    project_path,
    base_branch="main",
    compare_branch="develop"
)
print(f"Unique commits on develop: {len(unique_commits)}")

# 7. Get commit details
if commits:
    commit_files = client.get_commit_changed_files(
        project_path,
        commit_id=commits[0].commit_id
    )
    print(f"Files in commit {commits[0].commit_short_id}: {commit_files}")

# 8. Clone and checkout
local_path = "./local_repo"
repo = client.clone_repository(project_path, local_path, branch="main")
print(f"Repository cloned to: {local_path}")

# Checkout different branch
client.checkout_branch(local_path, branch="develop")
print("Checked out develop branch")

# Checkout tag
client.checkout_tag(local_path, "v1.0.0", new_branch="release/v1.0.0")
print("Created branch from tag v1.0.0")

# 9. Download pipeline artifacts
client.download_pipeline_artifacts(
    project_path=project_path,
    pipeline_id=123456,
    jobs_of_interest=["build", "test", "coverage"],
    output_folder="./pipeline_artifacts"
)
print("Artifacts downloaded")

Notes

  • Large repositories: The client warns when branches/tags/commits exceed large_repo_threshold
  • Error handling: Most methods handle errors gracefully and return None or empty lists
  • Date filtering: Accepts ISO-8601 strings, datetime, or date objects for date parameters
  • Change magnitude: Computing change statistics (compute_change_magnitude=True) requires additional API calls and may be slower
  • Best-effort operations: Artifact downloads log errors but continue processing remaining jobs
  • SSL verification: Can be disabled for self-signed certificates using ssl_verify=False

3.4 AI Utilities

This section describes the utility wrappers for working with AI chat providers. The framework uses a provider-based architecture that allows you to use different LLM backends through a unified interface.


Architecture Overview

The AI utilities are organized into three main components:

  1. Base Provider Interface (base_chat_provider.py) - Defines the common interface all providers implement
  2. Provider Implementations - Concrete implementations for specific AI services (Azure OpenAI, DeepInfra, etc.)
  3. Chat Client (openai_chat_client.py) - High-level client that manages conversations, sessions, and tool calling

Base Provider Interface

Module: base_chat_provider

Defines the foundational types and abstract interface for all chat providers.

Key Types

ChatMessage - TypedDict representing a chat message:

{
    "role": "system" | "user" | "assistant" | "tool",
    "content": str,
    "tool_calls": List[ToolCall],      # Optional, assistant-only
    "tool_call_id": str                # Optional, tool-only
}

CompletionResult - Dataclass containing the provider's response:

@dataclass(frozen=True)
class CompletionResult:
    text: str                           # The assistant's text response
    raw: Any                            # Raw provider response object
    model: Optional[str]                # Model identifier
    finish_reason: Optional[str]        # Why generation stopped
    usage: Optional[Dict[str, Any]]     # Token usage statistics
    tool_calls: Optional[List[ToolCall]] # Tool calls requested by model

ProviderError - Exception type for provider-specific errors:

class ProviderError(Exception):
    retryable: bool         # Whether the error is transient
    status_code: Optional[int]  # HTTP status code if applicable
    provider: Optional[str]     # Provider identifier
Abstract Class: BaseChatProvider

All provider implementations inherit from this base class.

Method: complete()

async def complete(
    self,
    messages: Messages,
    *,
    timeout: Optional[float] = None,
    max_tokens: Optional[int] = None,
    temperature: Optional[float] = None,
    top_p: Optional[float] = None,
    frequency_penalty: Optional[float] = None,
    presence_penalty: Optional[float] = None,
    extra: Optional[Mapping[str, Any]] = None,
) -> CompletionResult

Method: aclose()

async def aclose() -> None

Closes underlying HTTP clients and releases resources.

Context Manager Support

async with provider:
    result = await provider.complete(messages)

Provider Implementations

Class: AzureOpenAIProvider

Provider for Azure OpenAI Service using the OpenAI Python SDK.

Initialization
from providers import AzureOpenAIProvider

provider = AzureOpenAIProvider(
    azure_endpoint="<AZURE_OPENAI_ENDPOINT>",    # Required
    api_key="<AZURE_OPENAI_API_KEY>",            # Required
    api_version="2024-03-01-preview",             # Required
    model="gpt-4",                                # Required (deployment name)
    default_max_tokens=4000,                      # Optional
    default_temperature=0.0,                      # Optional
    default_top_p=0.95,                           # Optional
    default_frequency_penalty=0.0,                # Optional
    default_presence_penalty=0.0,                 # Optional
)
Notes
  • In Azure, the model parameter should be your deployment name, not the model identifier
  • Assumes OpenAI-style message format
  • Provides automatic error mapping with retryability detection
Example Usage
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Explain quantum computing."}
]

result = await provider.complete(
    messages,
    max_tokens=2000,
    temperature=0.7
)

print(result.text)
print(f"Tokens used: {result.usage}")

Class: DeepInfraProvider

Provider for DeepInfra's OpenAI-compatible API.

Initialization
from providers import DeepInfraProvider

provider = DeepInfraProvider(
    api_key="<DEEPINFRA_API_KEY>",                      # Required
    model="moonshotai/Kimi-K2-Instruct",                 # Optional (default shown)
    base_url="https://api.deepinfra.com/v1/openai",      # Optional (default shown)
    default_max_tokens=4000,                             # Optional
    default_temperature=0.0,                             # Optional
    default_top_p=0.95,                                  # Optional
    default_frequency_penalty=0.0,                       # Optional
    default_presence_penalty=0.0,                        # Optional
)
Notes
  • Uses OpenAI SDK with custom base_url pointing to DeepInfra
  • Supports all DeepInfra-hosted models through their OpenAI-compatible endpoint
  • Automatically handles error classification and retryability
Example Usage
# Using extras parameter for provider-specific features
result = await provider.complete(
    messages,
    temperature=0.5,
    extras={
        "tools": [
            {
                "type": "function",
                "function": {
                    "name": "get_weather",
                    "parameters": {"type": "object", "properties": {...}}
                }
            }
        ]
    }
)

High-Level Chat Client

Class: OpenAIChatClient

A full-featured wrapper that manages conversation sessions, automatic retries, message history, tool calling, and persistent storage.

Initialization
from openai_chat_client import OpenAIChatClient
from providers import AzureOpenAIProvider

# Create a provider
provider = AzureOpenAIProvider(
    azure_endpoint="<AZURE_OPENAI_ENDPOINT>",
    api_key="<AZURE_OPENAI_API_KEY>",
    api_version="2024-03-01-preview",
    model="gpt-4"
)

# Create the client
client = OpenAIChatClient(
    provider=provider,                                      # Required
    default_system_message="You are a helpful assistant.",  # Optional
    max_tokens=4000,                                        # Optional
    temperature=0.0,                                        # Optional
    top_p=0.95,                                             # Optional
    frequency_penalty=0.0,                                  # Optional
    presence_penalty=0.0,                                   # Optional
    include_message_history=True,                           # Optional
    save_sessions_to_disk=True,                             # Optional
    verbose=False,                                          # Optional
    log_messages=False,                                     # Optional
    logger=None,                                            # Optional
    log_level=None,                                         # Optional
    store_tool_messages=False,                              # Optional
    strip_thinking=False,                                   # Optional
    thinking_tag_patterns=None                              # Optional
)
Parameters
  • provider: Any BaseChatProvider implementation (required)
  • default_system_message: System prompt for conversations
  • max_tokens, temperature, top_p, frequency_penalty, presence_penalty: Generation parameters
  • include_message_history: Whether to maintain conversation context
  • save_sessions_to_disk: Auto-save sessions to .sessions/ directory
  • store_tool_messages: Include tool call traces in session history
  • strip_thinking: Remove thinking tags (e.g., <think>...</think>) from responses
  • thinking_tag_patterns: Custom regex patterns for thinking tag removal

Session Management
Method: request_completion_async()

Send a message and get a response (async).

response = await client.request_completion_async(
    message_content="Explain quantum computing in simple terms",  # Required
    session_id="session1",                                        # Optional (default: "default")
    timeout=30.0,                                                 # Optional
    image_paths=None,                                             # Optional (list of image files)
    image_urls=None                                               # Optional (list of image URLs)
)

Supports multimodal inputs:

response = await client.request_completion_async(
    message_content="What's in this image?",
    session_id="vision_session",
    image_paths=["./photo.jpg"]
)
Method: trim_conversation_history()

Limit session history to the last N messages.

client.trim_conversation_history(
    session_id="session1",  # Optional (default: "default")
    max_length=50           # Required
)
Method: change_system_message()

Update the system message for a session.

client.change_system_message(
    system_message="You are an expert in physics.",
    session_id="session1"  # Optional
)
Method: reset_conversation()

Clear all messages except the system message.

client.reset_conversation(session_id="session1")
Method: get_message_count()

Get the number of messages in a session.

count = client.get_message_count(session_id="session1")

Persistence Methods
Method: save_conversation()

Save session to a JSON file.

client.save_conversation(
    file_path="./conversation_backup.json",
    session_id="session1"  # Optional
)
Method: load_conversation()

Load session from a JSON file.

client.load_conversation(
    file_path="./conversation_backup.json",
    session_id="session1"  # Optional
)
Method: get_conversation_history_as_text()

Export session history as formatted text.

text = client.get_conversation_history_as_text(session_id="session1")
print(text)

Tool/Function Calling

The client supports OpenAI-style function calling with automatic tool execution loops.

Method: register_tool()

Register a callable function as a tool.

def get_weather(location: str, unit: str = "celsius") -> dict:
    """Get the weather for a location."""
    return {"location": location, "temperature": 22, "unit": unit}

client.register_tool(
    schema={
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the current weather for a location",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string", "description": "City name"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["location"]
            }
        }
    },
    func=get_weather,
    name="get_weather",          # Optional (extracted from schema if not provided)
    max_tool_rounds=8,            # Optional (max iterations)
    overwrite=True,               # Optional (allow replacing existing tool)
    insert_front=False            # Optional (priority ordering)
)

Async tools are also supported:

async def search_database(query: str) -> list:
    # Async implementation
    await asyncio.sleep(0.1)
    return [{"result": "data"}]

client.register_tool(
    schema={...},
    func=search_database
)

Tool execution flow:

  1. User sends a message
  2. If model requests tool calls, client executes them automatically
  3. Tool results are sent back to the model
  4. Model generates final response based on tool results
  5. Process repeats up to max_tool_rounds times

Image Support

The client supports vision models through image inputs.

# Local image files
response = await client.request_completion_async(
    message_content="Describe this image",
    image_paths=["./diagram.png", "./photo.jpg"]
)

# Remote image URLs
response = await client.request_completion_async(
    message_content="What's in these images?",
    image_urls=["https://example.com/image1.jpg"]
)

# Mix both
response = await client.request_completion_async(
    message_content="Compare these images",
    image_paths=["./local.png"],
    image_urls=["https://example.com/remote.jpg"]
)

Advanced Features
Thinking Tag Stripping

Some models output reasoning in XML-style thinking tags. Enable automatic removal:

client = OpenAIChatClient(
    provider=provider,
    strip_thinking=True,
    thinking_tag_patterns=[
        r"<think>.*?</think>",
        r"<reasoning>.*?</reasoning>",
        r"<internal>.*?</internal>"
    ]
)
Session Auto-Save

Sessions are automatically saved to .sessions/<session_id>.json when save_sessions_to_disk=True. The client uses atomic writes to prevent corruption.

Error Handling
from providers import ProviderError

try:
    response = await client.request_completion_async(
        message_content="Hello",
        timeout=5.0
    )
except ProviderError as e:
    if e.retryable:
        # Retry logic here
        print(f"Transient error from {e.provider}: {e}")
    else:
        # Permanent failure
        print(f"Non-retryable error: {e}")
except asyncio.TimeoutError:
    print("Request timed out")

Complete Example

import asyncio
from providers import AzureOpenAIProvider
from openai_chat_client import OpenAIChatClient

async def main():
    # Initialize provider
    provider = AzureOpenAIProvider(
        azure_endpoint="https://your-resource.openai.azure.com/",
        api_key="your-api-key",
        api_version="2024-03-01-preview",
        model="gpt-4-deployment-name"
    )
    
    # Create client
    client = OpenAIChatClient(
        provider=provider,
        default_system_message="You are a helpful coding assistant.",
        temperature=0.3,
        save_sessions_to_disk=True
    )
    
    # Register a tool
    def calculate(expression: str) -> float:
        """Safely evaluate a mathematical expression."""
        return eval(expression, {"__builtins__": {}})
    
    client.register_tool(
        schema={
            "type": "function",
            "function": {
                "name": "calculate",
                "description": "Evaluate a mathematical expression",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "expression": {"type": "string"}
                    },
                    "required": ["expression"]
                }
            }
        },
        func=calculate
    )
    
    # Have a conversation
    response1 = await client.request_completion_async(
        message_content="What is 123 * 456?",
        session_id="math_session"
    )
    print(f"Assistant: {response1}")
    
    response2 = await client.request_completion_async(
        message_content="Now divide that by 2",
        session_id="math_session"  # Continues same conversation
    )
    print(f"Assistant: {response2}")
    
    # Export conversation
    history = client.get_conversation_history_as_text("math_session")
    print(f"\nFull conversation:\n{history}")
    
    # Cleanup
    await provider.aclose()

if __name__ == "__main__":
    asyncio.run(main())

3.5 OS Utilities

This section describes additional utility modules aimed at filesystem and content-level operations. These tools can be used independently to scan, monitor, or write file content efficiently across various formats.

ContentScanner Class

Purpose: Scans the content of different file types (text, CSV, Excel, DOCX, PDF, etc.) for specified string or regex patterns.

Class: ContentScanner

Constructor Usage:

from utilities import ContentScanner

scanner = ContentScanner(
    string_patterns=['error', 'fail'],     # Optional: list of plain string patterns
    regex_patterns=[r'\berror\b'],       # Optional: list of regex patterns
    case_sensitive=False,                  # Optional: defaults to False
    max_results=10,                        # Optional: maximum matches per file
    verbose=True                           # Optional: enable logging
)

Method: scan_files(file_paths: List[Union[str, Path]]) -> AsyncGenerator

Description: Asynchronously scans a list of files and yields matches as dictionaries containing the file path, matching line, and line number.


DirectoryWatcher Class (Windows Only)

Purpose: Watches for file system changes in a directory using the Windows Win32 API.

Class: DirectoryWatcher

Constructor Usage:

from utilities import DirectoryWatcher

watcher = DirectoryWatcher(
    path='C:/projects',                    # Required: path to watch
    recursive=True,                        # Optional: monitor subdirectories
    debounce_interval=1.0,                 # Optional: debounce time in seconds
    file_patterns=['*.txt', '*.log'],      # Optional: glob patterns
    event_callback=my_callback             # Optional: function to call on each event
)

Method: watch() -> AsyncGenerator

Description: Asynchronously yields DirectoryChangeEvent objects for file changes.

Note: Only works on Windows.


FileScanner Class

Purpose: Recursively scans directories for files matching criteria like name, size, modification date, and folder pattern.

Class: FileScanner

Constructor Usage:

from utilities import FileScanner, TraversalMethod
from datetime import datetime

scanner = FileScanner(
    root_dir='C:/data',                    # Required: root directory
    max_workers=10,                        # Optional: number of worker threads
    method=TraversalMethod.DFS,            # Optional: BFS or DFS traversal
    file_patterns=[r'.*\.log$'],          # Optional: regex for files
    folder_patterns=[r'logs'],             # Optional: regex for folder names
    first_folder_patterns=[r'2025'],       # Optional: regex for top-level folders
    max_depth=5,                           # Optional: depth limit
    min_file_size=1024,                    # Optional: min size in bytes
    modified_after=datetime(2024, 1, 1),   # Optional: filter by modification time
    skip_hidden=True,                      # Optional: ignore hidden files/folders
    follow_symlinks=False                  # Optional: whether to follow symlinks
)

Method: scan_files() -> AsyncGenerator

Description: Asynchronously yields matching file paths as Path objects.

3.6 Excel Utilities

The Excel utility modules provide streamlined ways to compare and modify Excel files, either to validate content across files or make controlled edits. These are particularly useful for automating test validation, data migration verification, and editing result files.

ExcelComparer

Description: The ExcelComparer class provides an automated way to compare two Excel files sheet-by-sheet. It supports column exclusions, float value tolerance, and case-insensitive comparisons. Useful for validating exported data from different environments or after transformations.

Usage Example:

from utilities import ExcelComparer

comparer = ExcelComparer(
    file_path1='old_version.xlsx',                 # (required) Path to the first Excel file
    file_path2='new_version.xlsx',                 # (required) Path to the second Excel file
    ignore_columns=['last_updated', 'id'],         # (optional) List of columns to ignore in comparison
    float_tolerance=1e-4,                          # (optional) Float comparison tolerance (default: 1e-6)
    case_insensitive=True,                         # (optional) Whether to ignore case when comparing strings
    verbose=True                                   # (optional) Enable verbose output
)

report = comparer.compare()
comparer.diff_to_csv("comparison_output.csv")
print(comparer.diff_to_str())

3.7 Config utilities

The Config utilities module provides mechanisms to load, merge, access, and write configuration files across multiple standard formats such as .env, .yaml, .json, .toml, .ini, and .xml. It contains two main components:

Config class (from config_parser.py)

This class implements a singleton pattern to manage and expose application configurations. It loads environment variables from .env files found in common configuration directories and merges configuration from supported formats (.yaml/.yml, .toml, .json, .ini, .xml) across those directories.

By default, it searches the current and config directories (in this order), plus any optional paths provided to reload. If whitelist is provided, only config files whose name or (POSIX) path matches at least one whitelist pattern are included. Patterns use shell-style wildcards (fnmatch), e.g. ["app.yaml", "secrets*.toml", "config/*.json"].

Features:

  • Loads .env variables.
  • Supports config files: YAML, JSON, TOML, INI, and XML.
  • Converts configs to a nested SimpleNamespace structure.
  • Merges multiple config files into one consistent structure.
  • Provides an env field to access environment variables.
  • Supports hot-reloading of config.

Usage:

from utilities import Config

Config().reload(["./path/to/config"], ["*.json", "*.toml"]) # Optional

cfg = Config().get()
print(cfg.app.name)        # Access config value
print(cfg.os.env.DEBUG)    # Access environment variable

write_config function (from config_writer.py)

This utility function exports a configuration dictionary or SimpleNamespace to a file in a specified format. Useful for persisting or exporting configuration values after runtime modifications.

Parameters:

  • config (dict | SimpleNamespace): Configuration data to write.
  • filename (str): Path to the output file.
  • format (ConfigFormat): The target file format (JSON, YAML, TOML, INI, XML).
  • exclude_keys (List[str], optional): Top-level keys to exclude from the output.

Usage:

from utilities import write_config, ConfigFormat
from types import SimpleNamespace

config = SimpleNamespace(app=SimpleNamespace(name="MyApp", debug=True))

write_config(
    config=config,                    # Mandatory
    filename="config_out.yaml",      # Mandatory
    format=ConfigFormat.YAML,         # Mandatory
    exclude_keys=["secret"]          # Optional
)

This ensures a clean and modular way to handle config loading and writing for any project using these utilities.

3.8 Teams Utilities

A client for sending messages and adaptive cards to Microsoft Teams channels via webhooks.
Supports plain text messages and complex card payloads, with configurable retries and timeouts.

TeamsClient

Description:

The TeamsClient class is a utility for posting notifications and adaptive cards to Microsoft Teams channels.
It is designed for:

  • Sending plain text messages to Teams channels
  • Sending adaptive or formatted card payloads (for example, lists of Jira issues)
  • Configurable retry and timeout logic
  • Mapping named channels to Teams webhook URLs
  • Formatting utilities for date/times

Usage Example:

from utilities import TeamsClient

channels = {
    "dev": "https://outlook.office.com/webhook/...",
    "qa": "https://outlook.office.com/webhook/...",
    # Add more channel:webhook pairs
}

client = TeamsClient(
    channels=channels,
    jira_base_url="https://jira.company.com",
    verbose=True,
    logger=logger,  # Optional
    log_level=None,
    max_issues=20,
    post_timeout=8,
    post_retries=2
)

# 1. Send a simple text message to the "dev" channel
ok = client.send_message("dev", "Deploy complete! 🚀")
print("Sent?", ok)

# 2. Send a formatted (adaptive card) payload
payload = {
    "type": "message",
    "attachments": [
        {
            "contentType": "application/vnd.microsoft.card.adaptive",
            "content": {
                # ... adaptive card content ...
            }
        }
    ]
}
ok = client.send_formatted_message("qa", payload)
print("Sent?", ok)

# 3. Custom channel name not found? Returns False and logs an error
ok = client.send_message("unknown", "This will not send")
print("Sent?", ok)

# 4. Formatting a datetime for Teams card
dt_str = TeamsClient._format_datetime("2024-07-05T15:25:00")
print(dt_str)  # -> "2024-07-05 15:25"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py3_utilities-1.3.0.tar.gz (101.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py3_utilities-1.3.0-py3-none-any.whl (99.8 kB view details)

Uploaded Python 3

File details

Details for the file py3_utilities-1.3.0.tar.gz.

File metadata

  • Download URL: py3_utilities-1.3.0.tar.gz
  • Upload date:
  • Size: 101.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.12.10 Windows/11

File hashes

Hashes for py3_utilities-1.3.0.tar.gz
Algorithm Hash digest
SHA256 82b642665fbef57553bb53168685cdbbcf6bbab33cb0f1519b74da87f386f35b
MD5 164ff825f0edeca3ebf01bd9d15d08bf
BLAKE2b-256 27b3cf41cfd9b58faa3cffdfc5da54793611f8187e2b1a10d0c90bfc96c13a09

See more details on using hashes here.

File details

Details for the file py3_utilities-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: py3_utilities-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 99.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.12.10 Windows/11

File hashes

Hashes for py3_utilities-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2bf20945740bb1699c37708702d4e29d479b653ada02428c0060b5c8236a418c
MD5 60f5778e0c00e5323e40270d5c64820b
BLAKE2b-256 fafeec058e8b1668931be8c63d92a9e65d137c0014d79f9b24c218a1e0d3ee1c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page