Skip to main content

dependency injector

Project description

介绍

  • 该类库主要提供多种类注入方式, 主要利用抽象类,解决python中原生无接口的工程方面的缺陷

示例一:

  • 单实例化
from BusyBox.ServiceBox import Box

class AppleService(object):

    def name(self):
        return 'test'

if __name__ == '__main__':
    box = Box()
    box.inject(AppleService)
    box.apple_service.name()

示例二:

  • 带参 多实例化
from BusyBox.ServiceBox import Box

class TestService(object):

    def __init__(self, params1):
        self.params1 = params1

    def handle(self):
        return self.params1


class RestService(object):

    def __init__(self, params1):
        self.params1 = params1

    def handle(self):
        return self.params1

if __name__ == '__main__':
    box = Box()
    box.inject(TestService, RestService, payload=dict(params1=1))
    box.rest_service.handle()
    box.test_service.handle()

示例三:

  • 类命名中带实例
from BusyBox.ServiceBox import Box

class Bus1Service(object):

    def name(self):
        return 'test'

if __name__ == '__main__':
    box = Box()
    box.inject(Bus1Service)
    box.bus1_service.name()

示例四:

  • 类命名中带实例
from BusyBox.ServiceBox import Box

box = Box()

@box.depend()
class CowService(object):

    @staticmethod
    def name():
        return 'test'

if __name__ == '__main__':
    box = Box()
    box.inject(CowService)
    box.cow_service.name()

示例五:

  • __init__方法带参实例
from BusyBox.ServiceBox import Box

box = Box()

@box.depend()
class EasyService(object):

    def __init__(self, params1, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        self.params1 = params1

    def name(self):
        return self.params1, self.args, self.kwargs

if __name__ == '__main__':
    box = Box()
    box.invoke('easy_service', 1, 2, 3, a=4, b=5)
    box.easy_service.name()

示例六:

  • 注入参数 在__init__参数中不需要时,在有没有kwargs参数情况下会忽略
from BusyBox.ServiceBox import Box
class PositionService(object):

    def __init__(self, params1: str):
        self.params1 = params1

    def show_params(self):
        print(f'{__class__}: self.params1:', self.params1)
        return self.params1
if __name__ == '__main__':
    box = Box()
    box.inject(PositionService, payload={'params1': 1, 'params2': 1})
    box.position_service.show_params()
    """
    输出:
        <class 'TestInjectorDemo.PositionService'>: self.params1: 1
    """

示例七:

  • 实体重置复用
from BusyBox.ServiceBox import Box

class Position4Service(object):

    def __init__(self, params1: str, *_args, **_kwargs):
        self.params1 = params1
        self.args = _args
        self.kwargs = _kwargs

    def show_params1(self):
        print('self.params1:', self.params1)
        return self.params1

    def show_args(self):
        print('self._args:', self.args)
        return self.args

    def show_kwargs(self):
        print('self._kwargs:', self.kwargs)
        return self.kwargs
if __name__ == '__main__':
    box = Box()
    box.inject(Position4Service, args_payload=(99, 1, 2, 3, 4), kwargs_payload=dict(params2=88))
    box.position4_service.show_params1() == 99              # True
    box.position4_service.show_args() == (1, 2, 3, 4)       # True
    box.position4_service.show_kwargs() == dict(params2=88) # True
    box.position4_service.params1 = 100                     # True
    box.position4_service.show_params1() == 100
    # 重置
    box.reset('position4_service')
    box.position4_service.show_params1() == 99              # True
    box.position4_service.show_args() == (1, 2, 3, 4)       # True
    box.position4_service.show_kwargs() == dict(params2=88) # True

实例八

  • 服务中根据依赖服务的接口注入相关工厂
import abc
from BusyBox.ServiceBox import FactoryInjectAPI, factory_inject

class Child1API(metaclass=abc.ABCMeta):
    def func1(self) -> str:
        raise NotImplementedError


class Child2API(metaclass=abc.ABCMeta):
    def func2(self) -> str:
        raise NotImplementedError


class Child1Service(Child1API):

    def func1(self) -> str:
        return "func1"


class Child2Service(Child2API):

    def func2(self) -> str:
        return "func2"


class Child1ServiceFactory(FactoryInjectAPI):

    def construct(self) -> Child1API:
        return Child1Service()


class Child2ServiceFactory(FactoryInjectAPI):

    def construct(self) -> Child2API:
        return Child2Service()


@factory_inject(
    Child1ServiceFactory,
    Child2ServiceFactory
)
class FatherService(object):

    child1_service: Child1API
    child2_service: Child2API

    def __init__(self):
        print('')

    def test(self):
        print('test:', self.child1_service.func1(), self.child2_service.func2())

实例九

  • 依赖的服务共享宿主实例的属性
# -*- coding: utf-8 -*-
import abc
from typing import Any
from BusyBox.ServiceBox import FactoryInjectAPI, factory_inject


class Child1RefAPI(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def func1(self) -> str:
        raise NotImplementedError


class Child2RefAPI(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def func2(self) -> str:
        raise NotImplementedError


class Context(object):
    session: Any

    def __init__(self, session: Any):
        self.session = session


class Child1RefService(Child1RefAPI):
    context: Context

    def __init__(self, ctx: Context):
        self.context = ctx

    def func1(self) -> str:
        return f"Child {self.context} {id(self.context)}"


class Child2RefService(Child2RefAPI):
    context: Context

    def __init__(self, ctx: Context):
        self.context = ctx

    def func2(self) -> str:
        return f"Child {self.context} {id(self.context)}"


class Child1RefServiceFactory(FactoryInjectAPI):

    def construct(self, context: Context) -> Child1RefAPI:
        return Child1RefService(context)


class Child2RefServiceFactory(FactoryInjectAPI):

    def construct(self, context: Context) -> Child2RefAPI:
        return Child2RefService(context)


@factory_inject(
    Child1RefServiceFactory,
    Child2RefServiceFactory,
    refer=Context
)
class FatherDeepService(object):

    child1_service: Child1RefAPI
    child2_service: Child2RefAPI
    context_host: Context

    def __init__(self):
        self.context_host = Context('context from host')
        print(f'Host {self.context_host} {id(self.context_host)}')

    def test(self):
        print('test:', self.child1_service.func1(), self.child2_service.func2())
if __name__ == '__main__':
    f_srv = FatherDeepService()
    assert f_srv.child1_service.func1() == f_srv.child2_service.func2()
    assert f_srv.context_host == f_srv.child1_service.context

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

BusyBox-1.0.3.tar.gz (8.4 kB view details)

Uploaded Source

File details

Details for the file BusyBox-1.0.3.tar.gz.

File metadata

  • Download URL: BusyBox-1.0.3.tar.gz
  • Upload date:
  • Size: 8.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.3

File hashes

Hashes for BusyBox-1.0.3.tar.gz
Algorithm Hash digest
SHA256 301b73ff2a2ff1fdfa88e2998b729c212f87304e3cc57a8da1eaf4a1a1f11113
MD5 8cc62e30e4f7db107cfb5579c481ab45
BLAKE2b-256 778d0b2999cabf13dfaae352814b8aa432206f48559d5f439ed96f544d16fa01

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page