Tiny Sync tool using Peewee Models for persistance
Project description
Peewee Syncer
Tiny tool to help syncronize data using peewee db model for state persistance.
Can work with uniquely id'd records (eg auto insert id's) or non unique (ie dates/timestamps)
Use is_unique_key=False
for non unique (see unit tests~)
If limit is reached offset
is used to get over the "hump" (ie bulk updates have been done on your table)
See (towards end) for AsyncIO Support
Install
pip install peewee-syncer
Usage
Note example below uses upsert_db_bulk()
helper.
This requires sqlite 3.25+ for Upsert support *
Eg
Download: https://packages.debian.org/search?keywords=libsqlite3-0
sudo dkpk -i libsqlite3-0_3.27.2-2~bpo9+1_amd64.deb
import os
import logging
from functools import partial
from peewee import SqliteDatabase, Model, CharField
from peewee_syncer.utils import upsert_db_bulk
from peewee_syncer import get_sync_manager, SyncManager, Processor, LastOffsetQueryIterator
log = logging.getLogger(__name__)
try:
os.remove('test.db')
except FileNotFoundError:
pass
db = SqliteDatabase('test.db')
SyncManager.init_db(db)
# Run once
SyncManager.create_table()
# A model to sync (could be anything, not peewee specific)
class MyModel(Model):
name = CharField()
# Method to compare/track
@classmethod
def get_key(cls, item):
return item.id
# Method to get records from last offset
@classmethod
def select_since_id(cls, since, limit, offset=None):
q = cls.select().where(cls.id > since)
if limit:
q = q.limit(limit)
return q
class Meta:
database = db
MyModel.create_table()
# Start at zero for first run (otherwise start=None to continue from previous position)
sync_manager = get_sync_manager(app="my-sync-service", start=0)
# A model to sync to (could be anything, not peewee specific)
class MySyncModel(Model):
some_name = CharField()
class Meta:
database = db
# A function to map the output to be synced
def row_output(model):
return {'id': model.id, 'some_name': model.name}
MySyncModel.create_table()
# Iterator Function
def it(since, limit):
q = MyModel.select_since_id(since, limit=limit)
return LastOffsetQueryIterator(q.iterator(),
# Function to convert to output
row_output_fun=row_output,
# Function to check the key of current record we are processing
key_fun=MyModel.get_key,
# The key is unique/atomic (use False if processing time based records as can have many for each key)
is_unique_key=True
)
# Processor
processor = Processor(
sync_manager=sync_manager,
it_function=it,
# A process function (iterates over the iterator)
process_function=partial(upsert_db_bulk, MySyncModel, preserve=['some_name'], conflict_target='id'),
# Pause up to 1 seconds on each iteration (percentage of records vs limit processed)
sleep_duration=1
)
# Add some records
for i in range(25):
MyModel.create(id=i, name="test_{}".format(i))
log.info("MySyncModel has {} records".format(MySyncModel.select().count()))
# Run (batch of ten, five iterations. set i=0 to run forever)
processor.process(limit=10, i=5)
log.info("MySyncModel has {} records".format(MySyncModel.select().count()))
Output
peewee DEBUG ('CREATE TABLE IF NOT EXISTS "sync_manager" ("app" VARCHAR(256) NOT NULL PRIMARY KEY, "meta" TEXT NOT NULL, "modified" DATETIME)', [])
peewee DEBUG ('CREATE TABLE IF NOT EXISTS "mymodel" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL)', [])
peewee DEBUG ('SELECT "t1"."app", "t1"."meta", "t1"."modified" FROM "sync_manager" AS "t1" WHERE ("t1"."app" = ?) LIMIT ? OFFSET ?', ['my-sync-service', 1, 0])
peewee DEBUG ('BEGIN', None)
peewee DEBUG ('INSERT INTO "sync_manager" ("app", "meta", "modified") VALUES (?, ?, ?)', ['my-sync-service', '{}', datetime.datetime(2019, 6, 4, 16, 10, 18, 603755)])
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 0, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 609370), 'my-sync-service'])
peewee DEBUG ('CREATE TABLE IF NOT EXISTS "mysyncmodel" ("id" INTEGER NOT NULL PRIMARY KEY, "some_name" VARCHAR(255) NOT NULL)', [])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [0, 'test_0'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [1, 'test_1'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [2, 'test_2'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [3, 'test_3'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [4, 'test_4'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [5, 'test_5'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [6, 'test_6'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [7, 'test_7'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [8, 'test_8'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [9, 'test_9'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [10, 'test_10'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [11, 'test_11'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [12, 'test_12'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [13, 'test_13'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [14, 'test_14'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [15, 'test_15'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [16, 'test_16'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [17, 'test_17'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [18, 'test_18'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [19, 'test_19'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [20, 'test_20'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [21, 'test_21'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [22, 'test_22'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [23, 'test_23'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [24, 'test_24'])
peewee DEBUG ('SELECT COUNT(1) FROM (SELECT 1 FROM "mysyncmodel" AS "t1") AS "_wrapped"', [])
__main__ INFO MySyncModel has 0 records
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [0, 10])
peewee DEBUG ('INSERT INTO "mysyncmodel" ("id", "some_name") VALUES (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?) ON CONFLICT ("id") DO UPDATE SET "some_name" = EXCLUDED."some_name"', [1, 'test_1', 2, 'test_2', 3, 'test_3', 4, 'test_4', 5, 'test_5', 6, 'test_6', 7, 'test_7', 8, 'test_8', 9, 'test_9', 10, 'test_10'])
peewee_syncer DEBUG Processed records n=10 offset=10
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 10, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 837385), 'my-sync-service'])
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [10, 10])
peewee DEBUG ('INSERT INTO "mysyncmodel" ("id", "some_name") VALUES (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?) ON CONFLICT ("id") DO UPDATE SET "some_name" = EXCLUDED."some_name"', [11, 'test_11', 12, 'test_12', 13, 'test_13', 14, 'test_14', 15, 'test_15', 16, 'test_16', 17, 'test_17', 18, 'test_18', 19, 'test_19', 20, 'test_20'])
peewee_syncer DEBUG Processed records n=10 offset=20
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 20, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 855367), 'my-sync-service'])
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [20, 10])
peewee DEBUG ('INSERT INTO "mysyncmodel" ("id", "some_name") VALUES (?, ?), (?, ?), (?, ?), (?, ?) ON CONFLICT ("id") DO UPDATE SET "some_name" = EXCLUDED."some_name"', [21, 'test_21', 22, 'test_22', 23, 'test_23', 24, 'test_24'])
peewee_syncer DEBUG Processed records n=4 offset=24
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 24, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 874314), 'my-sync-service'])
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [24, 10])
peewee_syncer DEBUG Caught up, sleeping..
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [24, 10])
peewee_syncer DEBUG Caught up, sleeping..
peewee_syncer DEBUG Stopping after iteration 5
peewee_syncer INFO Completed processing
peewee DEBUG ('SELECT COUNT(1) FROM (SELECT 1 FROM "mysyncmodel" AS "t1") AS "_wrapped"', [])
__main__ INFO MySyncModel has 24 records
AsyncIO
Uses peewee-async (https://github.com/05bit/peewee-async) Note: SQLite not supported yet: see https://github.com/05bit/peewee-async/issues/126
pip install peewee-syncer[async]
or (includes aiopg
)
pip install peewee-syncer[async-pg]
or (includes aiomysql
)
pip install peewee-syncer[async-mysql]
from peewee_syncer import get_sync_manager, AsyncProcessor
db_object = Manager(db, loop=None)
def it(since=None, limit=None):
log.debug("Getting iterator since={} limit={}".format(since, limit))
def dummy():
for x in range(since+1, since+limit+1):
log.debug("yielded {}".format(x))
yield {"x": x}
return LastOffsetQueryIterator(dummy(), row_output_fun=lambda x:x, key_fun=lambda x:x['x'], is_unique_key=True)
output = []
async def process(it):
nonlocal output
for item in it:
output.append(item)
log.debug("process item: {}".format(item))
processor = AsyncProcessor(
sync_manager=sync_manager,
it_function=it,
process_function=process,
object=db_object
)
async def consume():
await processor.process(limit=10, i=3)
asyncio.get_event_loop().run_until_complete(consume())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file peewee-syncer-0.2.4.tar.gz
.
File metadata
- Download URL: peewee-syncer-0.2.4.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d4055c8bdd08d3d9ef3de6b812cbfea1eec86ce103a0cf8877797b7fd976ebad |
|
MD5 | 91f57cfa06d2f52d740ca24eacb05a46 |
|
BLAKE2b-256 | 20f6b67e934574b710a3aa1d46cd9b26425e7731cc2e5c1497c95f030726e0e1 |
File details
Details for the file peewee_syncer-0.2.4-py3-none-any.whl
.
File metadata
- Download URL: peewee_syncer-0.2.4-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c769a39cb9d83e3b5aaa1899d4672e52d0aebcb88e468605602cac1cdefc1f88 |
|
MD5 | 3cef5a930fcfa313655857c5b6e1e684 |
|
BLAKE2b-256 | 58911f6ad341185c9b2c0ad5db9fe8a5e24c2b1e77c4f81ffb074832cdfa605f |