Customized exceptions handling
Project description
Intercept-it!
The philosophy of the library is the ability to flexibly catch any exception,
execute some logic specified for this exception and continue executing the program.
You can intercept exceptions from coroutines and ordinary functions using the same user's interface
Features
- Different ways to intercept exceptions
- Easy setup of interceptors objects
- Interceptors can be executed in asynchronous code
- Generic logging system:
- Built-in std logger
- Easy way to create and use custom loggers
- Supports the use of several loggers for the one exception
- Generic handlers:
- Use any callable such as default function, lambda function or complete class with any arguments
- Choose execution order of the handlers
- Maximum customization of any object such as loggers, handlers and interceptors
Installation guide (pip)
$ pip3 install intercept-it
Installation guide (GitHub)
1. Clone the project repo one of the following ways:
$ git clone https://github.com/pro100broo/intercept-it.git
$ git clone git@github.com/pro100broo/intercept-it.git
2. Jump into the project repository
$ cd intercept-it
3. If you have no python virtual environment, create and activate it
$ python3 -m venv venv
$ chmod +x venv/bin/activate
$ source venv/bin/acitvate
4. Install setuptools
$ pip3 install setuptools
4. Install the library one of the following ways:
$ make app-build
$ make app-build-clean
$ python3 setup.py install
Table of contents
Interceptors overview
There are three main classes to intercept exceptions:
GlobalInterceptor- Can catch multiple specified exceptions from a functionUnitInterceptor- Catches specified exception from a functionNestedtInterceptor- Is a container forGlobalInterceptorandUnitInterceptor. Routes any calls to them
All interceptors have three user interfaces:
- register_handler - Adds any callable handler to interceptor
- intercept - A decorator that catches exceptions
- wrap - A function that can wrap another function to catch exception within it
They all have similar behavior to regular functions and coroutines
Let's see how to configure and use Global Interceptor!
from intercept_it import GlobalInterceptor
from intercept_it.loggers import STDLogger
from intercept_it.utils import cooldown_handler
# Initialize interceptor's class with necessary parameters
interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError], # Collection of target exceptions
loggers=[STDLogger()] # Use default std logger
)
# Add some handlers to interceptor
interceptor.register_handler(
cooldown_handler, # callable
5, # positional argument
execution_order=1
)
interceptor.register_handler(
lambda x, y: print(f'{x}. {y}'), # another callable :)
'I am additional handler', 'It is so cool!', # a few positional arguments
execution_order=2
)
# Intercept the exception in decorator
@interceptor.intercept
def dangerous_calculation(some_number: int) -> float:
return some_number / 0
def dangerous_list_access(index: int) -> int:
numbers = [1, 2, 3]
return numbers[index]
if __name__ == '__main__':
dangerous_calculation(5)
# Intercept the exception in wrapper
interceptor.wrap(dangerous_list_access, 100)
Results:
2024-11-10 17:49:33.156556+03:00 | ERROR | File "...\intercept-it\examples\readme_examples\global_example.py", line 45: division by zero
I am additional handler. It is so cool!
2024-11-10 17:49:38.174263+03:00 | ERROR | File "...\intercept-it\examples\readme_examples\global_example.py", line 46: list index out of range
I am additional handler. It is so cool!
We used two simple handlers:
- Default cooldown handler (just waits the specified time after intercepting the exception)
- Simple lambda function with some logging message
You can execute more difficult logic such as sending exception details to logs stash or notify clients in messengers
Other Interceptors have quite different setup. You can find additional usage examples here
Usage tips
Loggers customization:
from intercept_it import GlobalInterceptor
from intercept_it.loggers import STDLogger
def custom_formatter(message: str) -> str:
return f'I was formatted: {message}'
# Default std logger
default_logger = STDLogger()
# Customized std logger
customized_logger = STDLogger(
logging_level='WARNING',
default_formatter=custom_formatter,
pytz_timezone='Africa/Tunis',
)
interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError],
loggers=[default_logger, customized_logger],
)
Results:
2024-11-10 15:55:28.415905+01:00 | ERROR | File "...\intercept-it\examples\loggers_customization.py", line 59: division by zero
2024-11-10 15:55:28.415905+01:00 | WARNING | I was formatted: division by zero
2024-11-10 15:55:33.428577+01:00 | ERROR | File "...\intercept-it\examples\loggers_customization.py", line 60: list index out of range
2024-11-10 15:55:33.428577+01:00 | WARNING | I was formatted: list index out of range
Creating new loggers:
Each logger must be an instance of the BaseLogger or AsyncBaseLogger class and implements save_logs method
import logging
from intercept_it import GlobalInterceptor
from intercept_it.loggers.base_logger import BaseLogger
# Custom logger
class CustomLogger(BaseLogger):
def __init__(self):
self._logger = logging.getLogger()
def save_logs(self, message: str) -> None:
self._logger.warning(f'Be careful! Im custom logger: {message}')
interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError],
loggers=[CustomLogger()],
)
Results:
Be careful! Im custom logger: division by zero
I am additional handler. It is so cool!
Be careful! Im custom logger: list index out of range
I am additional handler. It is so cool!
Exceptions management:
If you need to send intercepted exception higher up the call stack or implement nested interceptors, you need specify
raise_exception parameter
from intercept_it import GlobalInterceptor, UnitInterceptor
# Setup global interceptor
global_interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError],
raise_exception=True
)
global_interceptor.register_handler(
lambda message: print(message),
'Got exception in main function',
)
# Setup unit interceptor
unit_interceptor = UnitInterceptor(
raise_exception=True
)
unit_interceptor.register_handler(
lambda message: print(message),
'Got exception in third-party function',
)
@unit_interceptor.intercept(ZeroDivisionError)
def dangerous_calculation(some_number: int) -> float:
return some_number / 0
@global_interceptor.intercept
def main():
dangerous_calculation(100)
if __name__ == '__main__':
try:
main()
except ZeroDivisionError:
print('Got exception in entry point')
Results:
Got exception in third-party function
Got exception in main function
Got exception in entry point
Looping:
Let's imagine the situation:
Your script delivers important data from the API to the database every 30 minutes.
Suddenly, with the next request to the API you get 404 error. For example API server down to maintenance.
You can specify run_until_success option and wait until the server reboots.
import random
from intercept_it import UnitInterceptor
from intercept_it import STDLogger
from intercept_it.utils import cooldown_handler
class RequestsException(Exception):
pass
# Initialize interceptor's object with necessary configuration
interceptor = UnitInterceptor(
loggers=[STDLogger(default_formatter=lambda error: f'Error occurred: {error}. Waiting for success connection')],
run_until_success=True
)
interceptor.register_handler(
cooldown_handler,
5,
execution_order=2
)
# Simulating the webserver work
@interceptor.intercept(RequestsException)
def receive_data_from_api(api_key: str) -> dict[str, str]:
is_server_down = random.randint(0, 10)
if is_server_down >= 4:
raise RequestsException('Integration down to maintenance')
print(f'Successful connection with api key: {api_key}')
return {'user': 'pro100broo', 'password': '12345'}
if __name__ == '__main__':
print(f'Received data from integration: {receive_data_from_api("_API_KEY_")}')
Results:
2024-11-18 01:02:39.596949+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection
2024-11-18 01:02:44.597286+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection
2024-11-18 01:02:44.597286+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection
Successful connection with api key: _API_KEY_
Received data from integration: {'user': 'pro100broo', 'password': '12345'}
Additional processing of wrapped function parameters
Let's imagine another situation :)
You are developing a service where some data needs to be delivered anyway.
For example, it can be a chat messanger.
We take the necessary data from the task pool and try to send messages.
If the message was not delivered due to a broken connection, you must resend it, returning the data to the additional
task pool.
You can specify send_function_parameters_to_handlers parameter route wrapped function parameters to any handler
with enabled receive_parameters option
Some entities are initialized in an additional module:
# entities.py
import logging
from datetime import datetime
from pydantic import BaseModel
from intercept_it import UnitInterceptor
from intercept_it.loggers.base_logger import BaseAsyncLogger
# Custom exception
class RequestsException(Exception):
pass
# Custom async logger
class CustomLogger(BaseAsyncLogger):
def __init__(self):
self._logger = logging.getLogger()
async def save_logs(self, message: str) -> None:
self._logger.error(f"{message} | {datetime.now()}")
# Custom message model
class MessageModel(BaseModel):
message: str
status: str
def __str__(self) -> str:
return f"Text: {self.message}. Status: {self.status}"
# Initialize interceptor's object with necessary configuration
interceptor = UnitInterceptor(
loggers=[CustomLogger()],
send_function_parameters_to_handlers=True, # Enable sending parameters to handlers
execution_mode='async'
)
The main module:
# parameters_processing.py
import random
import asyncio
from entities import (
MessageModel,
RequestsException,
interceptor
)
# Handler for not delivered messages
async def parameters_handler(message: MessageModel, send_requests_queue: asyncio.Queue) -> None:
send_requests_queue.task_done()
print(f'Intercepted message: {message}')
message.status = 'Awaiting resend'
await resend_requests_queue.put(message)
interceptor.register_handler(
parameters_handler,
receive_parameters=True # Enable receiving parameters from wrapped function
)
# Attempt to send message
@interceptor.intercept(RequestsException)
async def send_message_to_server(message: MessageModel, tasks_queue: asyncio.Queue) -> None:
is_server_down = random.randint(0, 10)
if is_server_down == 10:
raise RequestsException(f'Connection lost. Failed to send message: {message}')
message.status = 'Delivered'
tasks_queue.task_done()
print(f'Message successfully delivered: {message}')
# Gets message from the queue and tries to send it
async def send_message(send_requests_queue: asyncio.Queue) -> None:
while True:
message_content = await send_requests_queue.get()
await send_message_to_server(message_content, send_requests_queue)
# Simulating the appearance of messages
async def generate_messages(send_requests_queue: asyncio.Queue) -> None:
[
await send_requests_queue.put(
MessageModel(
message=random.choice(['Hi!', 'Hello!', "What's up!"]),
status="Awaiting send"
)
) for _ in range(20)
]
# The entrypoint
async def main():
send_requests_queue = asyncio.Queue(maxsize=50)
await generate_messages(send_requests_queue)
tasks = [asyncio.create_task(send_message(send_requests_queue)) for _ in range(4)]
await send_requests_queue.join()
[task.cancel() for task in tasks]
print(f'Message queue for sending: {send_requests_queue}')
print(f'Message queue for resending: {resend_requests_queue}')
if __name__ == '__main__':
resend_requests_queue = asyncio.Queue(maxsize=50)
asyncio.run(main())
Results:
Connection lost. Failed to send message: Text: Hi!. Status: Awaiting send | 2024-11-18 03:22:30.645844
Connection lost. Failed to send message: Text: What's up!. Status: Awaiting send | 2024-11-18 03:22:30.647229
Intercepted message: Text: Hi!. Status: Awaiting send
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Intercepted message: Text: What's up!. Status: Awaiting send
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message queue for sending: <Queue maxsize=50 _getters[4]>
Message queue for resending: <Queue maxsize=50 _queue=[MessageModel(message='Hi!', status='Awaiting resend'), MessageModel(message="What's up!", status='Awaiting resend')] tasks=2>
Loggers and handlers executing in asynchronous code
There are two executing modes for loggers and handlers:
- Ordered (default) - coroutines will be executed in specified order
- Fast - coroutines will be wrapped in tasks and executed
Ordered mode:
import asyncio
from datetime import datetime
from intercept_it import UnitInterceptor
async def first_logging_operation() -> None:
await asyncio.sleep(5)
print(f'First handler delivered logs: {datetime.now()}')
async def second_logging_operation() -> None:
await asyncio.sleep(5)
print(f'Second handler delivered logs: {datetime.now()}')
# Initialize interceptor's object with necessary configuration
interceptor = UnitInterceptor(execution_mode='async')
interceptor.register_handler(
first_logging_operation,
execution_order=1
)
interceptor.register_handler(
second_logging_operation,
execution_order=2
)
@interceptor.intercept(ZeroDivisionError)
def dangerous_calculation(number: int):
return number / 0
if __name__ == '__main__':
asyncio.run(dangerous_calculation(100))
Results:
First handler delivered logs: 2024-11-18 11:49:14.129061
Second handler delivered logs: 2024-11-18 11:49:19.130104
Fast mode:
import asyncio
from datetime import datetime
from intercept_it import UnitInterceptor
async def first_logging_operation() -> None:
await asyncio.sleep(5)
print(f'First handler delivers logs: {datetime.now()}')
async def second_logging_operation() -> None:
await asyncio.sleep(5)
print(f'Second handler delivers logs: {datetime.now()}')
# Initialize interceptor's object with necessary configuration
interceptor = UnitInterceptor(
execution_mode='async',
handlers_execution_mode='fast'
)
interceptor.register_handler(
first_logging_operation,
)
interceptor.register_handler(
second_logging_operation,
)
@interceptor.intercept(ZeroDivisionError)
def dangerous_calculation(number: int) -> float:
return number / 0
if __name__ == '__main__':
asyncio.run(dangerous_calculation(100))
Results:
First handler delivers logs: 2024-11-18 11:50:29.481526
Second handler delivers logs: 2024-11-18 11:50:29.481526
Future plans
I want to solve the problem with exception tracing in asynchronous code.
The following points will allow us to obtain a complete tree of exceptions that occur during the execution of coroutines:
I also would like to add additional customization for loggers and add new types of interceptors
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file intercept_it-0.0.0.tar.gz.
File metadata
- Download URL: intercept_it-0.0.0.tar.gz
- Upload date:
- Size: 18.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a40582a50ff0a37963b77d11e9e5719f0c27875ea2e4ef3a793482f8a75f0f5f
|
|
| MD5 |
bb95b9531d80be67c229dffcdee1ac78
|
|
| BLAKE2b-256 |
4b4fb1f57c88230553da6102f93c0b2d22f503923c00d64fc243ef08dfa47074
|
File details
Details for the file intercept_it-0.0.0-py3-none-any.whl.
File metadata
- Download URL: intercept_it-0.0.0-py3-none-any.whl
- Upload date:
- Size: 18.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4144d5833005f8283af2ac8a70a0be1d1a1ff381d0b3b8d8ff2c5927412dc9d2
|
|
| MD5 |
26c2d364be1e90fe3b0b3af31ee02f9f
|
|
| BLAKE2b-256 |
ba9d18320991352e7a06df00f0f4cd338f12f265860c4bef3c742d86cc6e3bbe
|