Skip to main content

Caching library for asynchronous Python applications (both based on asyncio and Tornado) that handles dogpiling properly and provides a configurable & extensible API.

Project description

https://img.shields.io/pypi/v/py-memoize.svg https://img.shields.io/pypi/pyversions/py-memoize.svg https://readthedocs.org/projects/memoize/badge/?version=latest https://github.com/DreamLab/memoize/actions/workflows/tox-tests.yml/badge.svg https://github.com/DreamLab/memoize/actions/workflows/non-test-tox.yml/badge.svg

Extended docs (including API docs) available at memoize.readthedocs.io.

What & Why

What: Caching library for asyncio Python applications.

Why: Python deserves library that works in async world (for instance handles dog-piling ) and has a proper, extensible API.

Etymology

In computing, memoization or memoisation is an optimization technique used primarily to speed up computer programs by storing the results of expensive function calls and returning the cached result when the same inputs occur again. (…) The term “memoization” was coined by Donald Michie in 1968 and is derived from the Latin word “memorandum” (“to be remembered”), usually truncated as “memo” in the English language, and thus carries the meaning of “turning [the results of] a function into something to be remembered.” ~ Wikipedia

Getting Started

Installation

Basic Installation

To get you up & running all you need is to install:

pip install py-memoize

Installation of Extras

To harness the power of ujson (if JSON SerDe is used) install extra:

pip install py-memoize[ujson]

Usage

Provided examples use default configuration to cache results in memory. For configuration options see Configurability.

asyncio

To apply default caching configuration use:

import asyncio
import random
from memoize.wrapper import memoize


@memoize()
async def expensive_computation():
    return 'expensive-computation-' + str(random.randint(1, 100))


async def main():
    print(await expensive_computation())
    print(await expensive_computation())
    print(await expensive_computation())


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Features

Async-first

Asynchronous programming is often seen as a huge performance boost in python programming. But with all the benefits it brings there are also new concurrency-related caveats like dog-piling.

This library is built async-oriented from the ground-up, what manifests in, for example, in Dog-piling proofness or Async cache storage.

Configurability

With memoize you have under control:

  • timeout applied to the cached method;

  • key generation strategy (see memoize.key.KeyExtractor); already provided strategies use arguments (both positional & keyword) and method name (or reference);

  • storage for cached entries/items (see memoize.storage.CacheStorage); in-memory storage is already provided; for convenience of implementing new storage adapters some SerDe (memoize.serde.SerDe) are provided;

  • eviction strategy (see memoize.eviction.EvictionStrategy); least-recently-updated strategy is already provided;

  • entry builder (see memoize.entrybuilder.CacheEntryBuilder) which has control over update_after & expires_after described in Tunable eviction & async refreshing

  • value post-processing (see memoize.postprocessing.Postprocessing); noop is the default one; deep-copy post-processing is also provided (be wary of deep-copy cost & limitations, but deep-copying allows callers to safely modify values retrieved from an in-memory cache).

All of these elements are open for extension (you can implement and plug-in your own). Please contribute!

Example how to customize default config (everything gets overridden):

from datetime import timedelta

from memoize.configuration import MutableCacheConfiguration, DefaultInMemoryCacheConfiguration
from memoize.entrybuilder import ProvidedLifeSpanCacheEntryBuilder
from memoize.eviction import LeastRecentlyUpdatedEvictionStrategy
from memoize.key import EncodedMethodNameAndArgsKeyExtractor
from memoize.postprocessing import DeepcopyPostprocessing
from memoize.storage import LocalInMemoryCacheStorage
from memoize.wrapper import memoize

@memoize(
    configuration=MutableCacheConfiguration
    .initialized_with(DefaultInMemoryCacheConfiguration())
    .set_method_timeout(value=timedelta(minutes=2))
    .set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
                                                         expire_after=timedelta(minutes=5)))
    .set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
    .set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
    .set_storage(LocalInMemoryCacheStorage())
    .set_postprocessing(DeepcopyPostprocessing()),
    update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5))
)
async def cached():
    return 'dummy'

Still, you can use default configuration which:

  • sets timeout for underlying method to 2 minutes;

  • uses in-memory storage;

  • uses method instance & arguments to infer cache key;

  • stores up to 4096 elements in cache and evicts entries according to least recently updated policy;

  • refreshes elements after 10 minutes & ignores unrefreshed elements after 30 minutes;

  • does not post-process cached values.

If that satisfies you, just use default config:

from memoize.configuration import DefaultInMemoryCacheConfiguration
from memoize.wrapper import memoize


@memoize(configuration=DefaultInMemoryCacheConfiguration())
async def cached():
    return 'dummy'

Also, if you want to stick to the building blocks of the default configuration, but need to adjust some basic params:

from datetime import timedelta

from memoize.configuration import DefaultInMemoryCacheConfiguration
from memoize.wrapper import memoize


@memoize(configuration=DefaultInMemoryCacheConfiguration(capacity=4096, method_timeout=timedelta(minutes=2),
                                                         update_after=timedelta(minutes=10),
                                                         expire_after=timedelta(minutes=30)))
async def cached():
    return 'dummy'

Tunable eviction & async refreshing

Sometimes caching libraries allow providing TTL only. This may result in a scenario where when the cache entry expires latency is increased as the new value needs to be recomputed. To mitigate this periodic extra latency multiple delays are often used. In the case of memoize there are two (see memoize.entrybuilder.ProvidedLifeSpanCacheEntryBuilder):

  • update_after defines delay after which background/async update is executed;

  • expire_after defines delay after which entry is considered outdated and invalid.

This allows refreshing cached value in the background without any observable latency. Moreover, if some of those background refreshes fail they will be retried still in the background. Due to this beneficial feature, it is recommended to update_after be significantly shorter than expire_after.

Dog-piling proofness

If some resource is accessed asynchronously dog-piling may occur. Caches designed for synchronous python code (like built-in LRU) will allow multiple concurrent tasks to observe a miss for the same resource and will proceed to flood underlying/cached backend with requests for the same resource.

As it breaks the purpose of caching (as backend effectively sometimes is not protected with cache) memoize has built-in dog-piling protection.

Under the hood, concurrent requests for the same resource (cache key) get collapsed to a single request to the backend. When the resource is fetched all requesters obtain the result. On failure, all requesters get an exception (same happens on timeout).

An example of what it all is about:

import asyncio
from datetime import timedelta

from aiocache import cached, SimpleMemoryCache  # version 0.11.1 (latest) used as example of other cache implementation

from memoize.configuration import DefaultInMemoryCacheConfiguration
from memoize.wrapper import memoize

# scenario configuration
concurrent_requests = 5
request_batches_execution_count = 50
cached_value_ttl_ms = 200
delay_between_request_batches_ms = 70

# results/statistics
unique_calls_under_memoize = 0
unique_calls_under_different_cache = 0


@memoize(configuration=DefaultInMemoryCacheConfiguration(update_after=timedelta(milliseconds=cached_value_ttl_ms)))
async def cached_with_memoize():
    global unique_calls_under_memoize
    unique_calls_under_memoize += 1
    await asyncio.sleep(0.01)
    return unique_calls_under_memoize


@cached(ttl=cached_value_ttl_ms / 1000, cache=SimpleMemoryCache)
async def cached_with_different_cache():
    global unique_calls_under_different_cache
    unique_calls_under_different_cache += 1
    await asyncio.sleep(0.01)
    return unique_calls_under_different_cache


async def main():
    for i in range(request_batches_execution_count):
        await asyncio.gather(*[x() for x in [cached_with_memoize] * concurrent_requests])
        await asyncio.gather(*[x() for x in [cached_with_different_cache] * concurrent_requests])
        await asyncio.sleep(delay_between_request_batches_ms / 1000)

    print("Memoize generated {} unique backend calls".format(unique_calls_under_memoize))
    print("Other cache generated {} unique backend calls".format(unique_calls_under_different_cache))
    predicted = (delay_between_request_batches_ms * request_batches_execution_count) // cached_value_ttl_ms
    print("Predicted (according to TTL) {} unique backend calls".format(predicted))

    # Printed:
    # Memoize generated 17 unique backend calls
    # Other cache generated 85 unique backend calls
    # Predicted (according to TTL) 17 unique backend calls

if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Async cache storage

Interface for cache storage allows you to fully harness benefits of asynchronous programming (see interface of memoize.storage.CacheStorage).

Currently memoize provides only in-memory storage for cache values (internally at RASP we have others). If you want (for instance) Redis integration, you need to implement one (please contribute!) but memoize will optimally use your async implementation from the start.

Manual Invalidation

You could also invalidate entries manually. To do so you need to create instance of memoize.invalidation.InvalidationSupport) and pass it alongside cache configuration. Then you could just pass args and kwargs for which you want to invalidate entry.

from memoize.configuration import DefaultInMemoryCacheConfiguration
from memoize.invalidation import InvalidationSupport


import asyncio
import random
from memoize.wrapper import memoize

invalidation = InvalidationSupport()


@memoize(configuration=DefaultInMemoryCacheConfiguration(), invalidation=invalidation)
async def expensive_computation(*args, **kwargs):
    return 'expensive-computation-' + str(random.randint(1, 100))


async def main():
    print(await expensive_computation('arg1', kwarg='kwarg1'))
    print(await expensive_computation('arg1', kwarg='kwarg1'))

    print("Invalidation #1")
    await invalidation.invalidate_for_arguments(('arg1',), {'kwarg': 'kwarg1'})

    print(await expensive_computation('arg1', kwarg='kwarg1'))
    print(await expensive_computation('arg1', kwarg='kwarg1'))

    print("Invalidation #2")
    await invalidation.invalidate_for_arguments(('arg1',), {'kwarg': 'kwarg1'})

    print(await expensive_computation('arg1', kwarg='kwarg1'))

    # Sample output:
    #
    # expensive - computation - 98
    # expensive - computation - 98
    # Invalidation  # 1
    # expensive - computation - 73
    # expensive - computation - 73
    # Invalidation  # 2
    # expensive - computation - 59

if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Openness to granular TTL

Default configuration sets update and expiry based on fixed values, which are the same for all entries. If you need to set different TTLs for different entries, you can do so by providing a custom memoize.entrybuilder.CacheEntryBuilder.

import datetime
import asyncio
import random
from dataclasses import dataclass

from memoize.wrapper import memoize
from memoize.configuration import DefaultInMemoryCacheConfiguration, MutableCacheConfiguration
from memoize.entry import CacheKey, CacheEntry
from memoize.entrybuilder import CacheEntryBuilder
from memoize.storage import LocalInMemoryCacheStorage


@dataclass
class ValueWithTTL:
    value: str
    ttl_seconds: int  # for instance, it could be derived from Cache-Control response header


class TtlRespectingCacheEntryBuilder(CacheEntryBuilder):
    def build(self, key: CacheKey, value: ValueWithTTL):
        now = datetime.datetime.now(datetime.timezone.utc)
        ttl_ends_at = now + datetime.timedelta(seconds=value.ttl_seconds)
        return CacheEntry(
            created=now,
            update_after=ttl_ends_at,
            # allowing stale data for 10% of TTL
            expires_after=ttl_ends_at + datetime.timedelta(seconds=value.ttl_seconds // 10),
            value=value
        )


storage = LocalInMemoryCacheStorage()  # overridden & extracted for demonstration purposes only


@memoize(configuration=MutableCacheConfiguration
         .initialized_with(DefaultInMemoryCacheConfiguration())
         .set_entry_builder(TtlRespectingCacheEntryBuilder())
         .set_storage(storage))
async def external_call(key: str):
    return ValueWithTTL(
        value=f'{key}-result-{random.randint(1, 100)}',
        ttl_seconds=random.randint(60, 300)
    )


async def main():
    await external_call('a')
    await external_call('b')
    await external_call('b')

    print("Entries persisted in the cache:")
    for entry in storage._data.values():
        print('Entry: ', entry.value)
        print('Effective TTL: ', (entry.update_after - entry.created).total_seconds())

    # Entries persisted in the cache:
    # Entry: ValueWithTTL(value='a-result-79', ttl_seconds=148)
    # Effective TTL: 148.0
    # Entry: ValueWithTTL(value='b-result-27', ttl_seconds=192)
    # Effective TTL: 192.0


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py-memoize-3.1.0.tar.gz (24.3 kB view details)

Uploaded Source

Built Distribution

py_memoize-3.1.0-py3-none-any.whl (22.8 kB view details)

Uploaded Python 3

File details

Details for the file py-memoize-3.1.0.tar.gz.

File metadata

  • Download URL: py-memoize-3.1.0.tar.gz
  • Upload date:
  • Size: 24.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.15

File hashes

Hashes for py-memoize-3.1.0.tar.gz
Algorithm Hash digest
SHA256 271b7b32ff4a6fa99541e1cadd692953a5f3ecd072372c3b977bcc65677cf00b
MD5 cf735198c8453aaaf16a72cc3673599e
BLAKE2b-256 c18a9f3b88b1a442e5e906922f4c63a41ba74063b61535b71a10d9423afb40d6

See more details on using hashes here.

File details

Details for the file py_memoize-3.1.0-py3-none-any.whl.

File metadata

  • Download URL: py_memoize-3.1.0-py3-none-any.whl
  • Upload date:
  • Size: 22.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.15

File hashes

Hashes for py_memoize-3.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 08aa866d5a0d7eb9b56ef5cde97eebda61989b40c4a445e34fd666678039d990
MD5 85f7f0cc7c1299a2ff925758b246c1ca
BLAKE2b-256 0d4fd7ba7c252b857719eace4543a81604a28b0c1af49f453a8d1d9cae6ca463

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page