Skip to main content

Async Casbin role watcher to be used for monitoring updates to policies for PyCasbin

Project description

casbin_async_redis_watcher

async casbin redis watcher

async-redis-watcher

async-redis-watcher is the Redis watcher for pycasbin. With this library, Casbin can synchronize the policy with the database in multiple enforcer instances.

what is redis-watcher's problem?

When use web framework like FastApi, the event loop run in main thread, but redis-watcher start a new Thread for monitor policy change and synchronize the policy .

So when use sqlalchemy as AsyncEnforcer, because there has two loop in diff thread

def new_watcher(option: WatcherOptions):
    option.init_config()
    w = RedisWatcher()
    rds = Redis(host=option.host, port=option.port, password=option.password, ssl=option.ssl)
    if rds.ping() is False:
        raise Exception("Redis server is not available.")
    w.sub_client = rds.client().pubsub()
    w.pub_client = rds.client()
    w.init_config(option)
    w.close = False
    w.subscribe_thread.start()
    w.subscribe_event.wait(timeout=5)
    return w

It use w.subscribe_thread.start() to start a new thread, This can lead to a lot of mistakes!

You will see the follow error

cb=[_run_until_complete_cb() at C:\Python310\lib\asyncio\base_events.py:184]> got Future <Future pending> attached to a different loop

I modified the redis-watcher to async-redis-watcher, Solve these problems once and for all and support asynchronous calls.

Installation

pip install casbin_async_redis_watcher

Simple Example

import asyncio
import os
import casbin
from casbin_async_redis_watcher import new_watcher, WatcherOptions
from casbin import AsyncEnforcer
from functools import partial


async def update_policy(e: AsyncEnforcer, event):
    print("receive a new update policy....")
    print(event)
    await e.load_policy()


def get_examples(path):
    examples_path = os.path.split(os.path.realpath(__file__))[0] + "/../examples/"
    return os.path.abspath(examples_path + path)


async def make_async_casbin_watcher():
    test_option = WatcherOptions()
    test_option.host = "localhost"
    test_option.port = "6379"
    test_option.password = "password"
    test_option.channel = "test"
    test_option.db = 4
    test_option.ssl = False
    e = casbin.AsyncEnforcer(get_examples("rbac_model.conf"), get_examples("rbac_policy.csv"))
    func = partial(update_policy, e)
    test_option.optional_update_callback = func
    watcher = await new_watcher(test_option)
    e.set_watcher(watcher)
    await e.load_policy()
    return e


async def main():
    e = await make_async_casbin_watcher()
    await e.save_policy()
    await asyncio.sleep(2)
    await e.add_role_for_user("yangyanxing", "admin")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()

In pycasbin, casbin/async_internal_enforcer.py line 114,

 async def _add_policy(self, sec, ptype, rule):
    """async adds a rule to the current policy."""
    rule_added = self.model.add_policy(sec, ptype, rule)
    if not rule_added:
        return rule_added

    if self.adapter and self.auto_save:
        result = await self.adapter.add_policy(sec, ptype, rule)
        if result is False:
            return False

        if self.watcher and self.auto_notify_watcher:
            if callable(getattr(self.watcher, "update_for_add_policy", None)):
                self.watcher.update_for_add_policy(sec, ptype, rule)
            else:
                self.watcher.update()

    return rule_added

When call _add_policy function,_add_policies,save_policy and so on, if set watcher, will call watcher.update_for_add_policy or watcher.update() function.

But it's NOT use await, so, in this project, watcher.py, class RedisWatcher, the function update can't use async.

so I use loop.create_task for call the update function

def update(self):
    async def func():
        async with self.mutex:
            msg = MSG("Update", self.options.local_ID, "", "", "")
            return await self.pub_client.publish(self.options.channel, msg.marshal_binary())

    return self.loop.create_task(self.log_record(func))

Getting Help

License

This project is under Apache 2.0 License. See the LICENSE file for the full license text.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

casbin_async_redis_watcher-0.0.1.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

casbin_async_redis_watcher-0.0.1-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file casbin_async_redis_watcher-0.0.1.tar.gz.

File metadata

File hashes

Hashes for casbin_async_redis_watcher-0.0.1.tar.gz
Algorithm Hash digest
SHA256 137b06179d034b61d7a16b2fe7f794a6c3d16e1ef649c0eb1ae3f75c8f9b0e85
MD5 7d08c7751e8b3f9f0873329281b6be59
BLAKE2b-256 21267a5d3fb5fc848cd0d0e89f1addbf60ad91d81283f01a6a5e920d522fb49f

See more details on using hashes here.

File details

Details for the file casbin_async_redis_watcher-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for casbin_async_redis_watcher-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 653d553e163528fb465027da2d30d1c628b38b8b152fd050e242dd97a7e943ca
MD5 f86158cb512870570e2ae770ada97967
BLAKE2b-256 b90020366066e4a354dca3e772a5be6c01007eb0df1b73f305839a73b3b02ab8

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page