Skip to main content

A brief description of concurrent-collections

Project description

Python Concurrent (thread-safe) collections

Run all tests

tl;dr

Despite what many people think, Python's built-in list, dict, and deque are NOT thread-safe.
They may be thread safe for some operations, but not all.
This created a lot of confusion in the Python community.
Google style-guide recommends to not rely on atomicity of built-in collections.

concurrent_collections provides thread-safe alternatives by using locks internally to ensure safe concurrent access and mutation from multiple threads.

Inspired from the amazing C#'s concurrent collections.

Why use these collections?

There is a lot of confusion on whether Python collections are thread-safe or not1, 2, 3.

The bottom line is that Python's built-in collections are not fully thread-safe for all operations.
While some simple operations (like list.append() or dict[key] = value) are thread-safe due to the Global Interpreter Lock (GIL), compound operations and iteration with mutation are not. This can lead to subtle bugs, race conditions, or even crashes in multi-threaded programs.

See the Python FAQ: "What kinds of global value mutation are thread-safe?" for details. The FAQ explains that only some (if common) operations are guaranteed to be atomic and thread-safe, but for anything more complex, you must use your own locking.
The docs even go as far as to say:

When in doubt, use a mutex!

Which is telling.

Even Google recommends to not rely on atomicity of built-in collections.

This concurrent_collections library provides drop-in replacements that handle locking for you.
Suggestions and feedbacks are welcome.

  1. Are lists thread-safe?

  2. Google style guide advises against relying on Python's assignment atomicity

  3. What kind of "thread safe" are deque's actually?

Installation

Pip:

pip install concurrent_collections

My recommendation is to always use uv instead of pip – I personally think it's the best package and environment manager for Python.

uv add concurrent_collections

Collections

ConcurrentBag

A thread-safe, list-like collection.

from concurrent_collections import ConcurrentBag

bag = ConcurrentBag([1, 2, 3])
bag.append(4)
print(list(bag))  # [1, 2, 3, 4]

ConcurrentDictionary

A thread-safe dictionary. It has several atomic methods for safe concurrent operations:

  • assign_atomic() - Atomically assign a value to a key
  • update_atomic() - Atomically update a value using a function
  • remove_atomic() - Atomically remove a key and return its value
  • put_if_absent() - Atomically put a value only if the key doesn't exist
  • replace_if_present() - Atomically replace a value only if the key exists
  • replace_if_equal() - Atomically replace a value only if it equals the expected value
  • remove_if_exists() - Atomically remove a key if it exists
  • get_and_remove() - Atomically get and remove a value
  • get_locked() - Context manager for safe read-modify-write operations

ConcurrentDictionary's assign_atomic()

Assigns a dictionary value under a key in a thread-safe way. While dict["somekey"] = value is allowed, it's best to use assign_atomic() for clarity of intent. Using normal assignment will work but raise a UserWarning.

ConcurrentDictionary's remove_atomic()

Atomically removes a key from the dictionary and returns its value, or None if the key doesn't exist.

from concurrent_collections import ConcurrentDictionary

d = ConcurrentDictionary({'x': 1, 'y': 2})
value = d.remove_atomic('x')  # Returns 1, removes 'x'

ConcurrentDictionary's put_if_absent()

Atomically puts a value for a key only if the key is not already present. Returns the existing value if the key exists, None if the key was added.

from concurrent_collections import ConcurrentDictionary

d = ConcurrentDictionary({'x': 1})
existing = d.put_if_absent('x', 2)  # Returns 1, no change
existing = d.put_if_absent('y', 3)  # Returns None, adds 'y': 3

ConcurrentDictionary's replace_if_present()

Atomically replaces the value for a key only if the key exists. Returns True if the key was replaced, False if the key doesn't exist.

from concurrent_collections import ConcurrentDictionary

d = ConcurrentDictionary({'x': 1})
replaced = d.replace_if_present('x', 2)  # Returns True
replaced = d.replace_if_present('y', 3)  # Returns False

ConcurrentDictionary's replace_if_equal()

Atomically replaces the value for a key only if the current value equals the expected value. Returns True if the value was replaced, False otherwise.

from concurrent_collections import ConcurrentDictionary

d = ConcurrentDictionary({'x': 1})
replaced = d.replace_if_equal('x', 1, 2)  # Returns True
replaced = d.replace_if_equal('x', 1, 3)  # Returns False (current value is 2)

ConcurrentDictionary's get_locked()

When working with ConcurrentDictionary, you should use the get_locked method to safely read or update the value for a specific key in a multi-threaded environment. This ensures that only one thread can access or modify the value for a given key at a time, preventing race conditions.

from concurrent_collections import ConcurrentDictionary

d = ConcurrentDictionary({'x': "some value" })

# Safely read and update the value for 'x'
with d.get_locked('x') as value:
    # value is locked for this thread
    d['x'] = "new value"

ConcurrentDictionary's update_atomic()

Performs a thread-safe, in-place update to an existing value under a key.

d = ConcurrentDictionary({'x': 1 })
d.update_atomic("x", lambda v: v + 1) # d now contains 2 under the 'x' key.

ConcurrentQueue

For thread-safe queues, Python offers already a lot of alternatives, even too many, so I'm not going to add another. Please refer to the following.

In the queue module, there are the following thread-safe queue classes:

  • Queue
  • SimpleQueue
  • LifoQueue,
  • PriorityQueue

:warning: Note these queue collections are thread-safe, although it isn't explicitly clear from their type name, making it dangerously confusing for people mistakenly thinking that thread-safety applies also to e.g. the deque, which is absolutely not thread-safe.

Additionally, there are other queue classes in the multiprocessing module, which makes it even more confusing due to the redundancy with the above queue classes. This defines:

  • JoinableQueue
  • Queue (again)
  • SimpleQueue (again)

Equality and Identity Semantics

ConcurrentBag Equality

ConcurrentBag compares as a multiset - order doesn't matter, but element frequency does:

from concurrent_collections import ConcurrentBag

# These are equal (same elements, same frequencies)
bag1 = ConcurrentBag([1, 2, 2, 3])
bag2 = ConcurrentBag([2, 1, 3, 2])
assert bag1 == bag2  # True

# These are not equal (different frequencies)
bag3 = ConcurrentBag([1, 2, 3, 3])
assert bag1 != bag3  # True

ConcurrentQueue Equality

ConcurrentQueue compares elements in order, taking snapshots for consistency during concurrent operations:

from concurrent_collections import ConcurrentQueue

# These are equal (same elements, same order)
queue1 = ConcurrentQueue([1, 2, 3])
queue2 = ConcurrentQueue([1, 2, 3])
assert queue1 == queue2  # True

# These are not equal (different order)
queue3 = ConcurrentQueue([3, 2, 1])
assert queue1 != queue3  # True

ConcurrentDictionary Equality

ConcurrentDictionary compares key-value pairs, order doesn't matter:

from concurrent_collections import ConcurrentDictionary

# These are equal (same key-value pairs)
dict1 = ConcurrentDictionary({'a': 1, 'b': 2})
dict2 = ConcurrentDictionary({'b': 2, 'a': 1})
assert dict1 == dict2  # True

# These are not equal (different values)
dict3 = ConcurrentDictionary({'a': 1, 'b': 3})
assert dict1 != dict3  # True

Thread Safety Guarantees

All collections provide the following guarantees:

  1. Atomic Operations: All individual operations (append, remove, get, set) are atomic
  2. Consistent Snapshots: Iteration and equality comparisons take consistent snapshots
  3. No Race Conditions: Multiple threads can safely access and modify the collections
  4. Identity Consistency: Hash values and equality comparisons are consistent within a single operation

Note: While individual operations are thread-safe, compound operations (like checking length then conditionally modifying) should use the provided atomic methods or context managers to ensure consistency.

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

concurrent_collections-2.1.0.tar.gz (11.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

concurrent_collections-2.1.0-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

File details

Details for the file concurrent_collections-2.1.0.tar.gz.

File metadata

  • Download URL: concurrent_collections-2.1.0.tar.gz
  • Upload date:
  • Size: 11.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for concurrent_collections-2.1.0.tar.gz
Algorithm Hash digest
SHA256 d5760fd6082742ba2e159fa1da1ae280ad8e82dacd9746fee1bc7072f966f93a
MD5 3ede7276b488a31589e38a649a84008b
BLAKE2b-256 92bf93651cd88b9b872bb41535fb8a913e6307e0766399d34fe5fc3611bbe39a

See more details on using hashes here.

File details

Details for the file concurrent_collections-2.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for concurrent_collections-2.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6b105aa8f5599c450a8579bc411003bb288c0f79fa0043d16940a25e93d6a868
MD5 789c344dba394efa25e464d27eb16dbf
BLAKE2b-256 baf6d211328c89c1d57c362ca07bdb6ede27b92de4a858f427492b86c6fe9a12

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page