Skip to main content

Tiny Sync tool using Peewee Models for persistance

Project description

Peewee Syncer

PyPI version Python 3.6 Python 3.6

Tiny tool to help syncronize data using peewee db model for state persistance.

Can work with uniquely id'd records (eg auto insert id's) or non unique (ie dates/timestamps) Use is_unique_key=False for non unique (see unit tests~) If limit is reached offset is used to get over the "hump" (ie bulk updates have been done on your table)

See (towards end) for AsyncIO Support

Install

pip install peewee-syncer

Usage

Note example below uses upsert_db_bulk() helper. This requires sqlite 3.25+ for Upsert support *

Eg

Download: https://packages.debian.org/search?keywords=libsqlite3-0

 sudo dkpk -i libsqlite3-0_3.27.2-2~bpo9+1_amd64.deb 
import os
import logging
from functools import partial
from peewee import SqliteDatabase, Model, CharField
from peewee_syncer.utils import upsert_db_bulk
from peewee_syncer import get_sync_manager, SyncManager, Processor, LastOffsetQueryIterator


log = logging.getLogger(__name__)


try:
    os.remove('test.db')
except FileNotFoundError:
    pass

db = SqliteDatabase('test.db')


SyncManager.init_db(db)

# Run once
SyncManager.create_table()

# A model to sync (could be anything, not peewee specific)
class MyModel(Model):

    name = CharField()

    # Method to compare/track
    @classmethod
    def get_key(cls, item):
        return item.id

    # Method to get records from last offset
    @classmethod
    def select_since_id(cls, since, limit, offset=None):
        q = cls.select().where(cls.id > since)

        if limit:
            q = q.limit(limit)

        return q

    class Meta:
        database = db


MyModel.create_table()


# Start at zero for first run (otherwise start=None to continue from previous position)
sync_manager = get_sync_manager(app="my-sync-service", start=0)


# A model to sync to (could be anything, not peewee specific)
class MySyncModel(Model):

    some_name = CharField()

    class Meta:
        database = db


# A function to map the output to be synced
def row_output(model):
    return {'id': model.id, 'some_name': model.name}


MySyncModel.create_table()


# Iterator Function
def it(since, limit):
    q = MyModel.select_since_id(since, limit=limit)
    return LastOffsetQueryIterator(q.iterator(),
                                   # Function to convert to output
                                   row_output_fun=row_output,
                                   # Function to check the key of current record we are processing
                                   key_fun=MyModel.get_key,
                                   # The key is unique/atomic (use False if processing time based records as can have many for each key)
                                   is_unique_key=True
                                   )


# Processor
processor = Processor(
            sync_manager=sync_manager,
            it_function=it,
            # A process function (iterates over the iterator)
            process_function=partial(upsert_db_bulk, MySyncModel, preserve=['some_name'], conflict_target='id'),
            # Pause up to 1 seconds on each iteration (percentage of records vs limit processed)
            sleep_duration=1
        )


# Add some records
for i in range(25):
    MyModel.create(id=i, name="test_{}".format(i))

log.info("MySyncModel has {} records".format(MySyncModel.select().count()))

# Run (batch of ten, five iterations. set i=0 to run forever)
processor.process(limit=10, i=5)

log.info("MySyncModel has {} records".format(MySyncModel.select().count()))

Output

peewee DEBUG ('CREATE TABLE IF NOT EXISTS "sync_manager" ("app" VARCHAR(256) NOT NULL PRIMARY KEY, "meta" TEXT NOT NULL, "modified" DATETIME)', [])
peewee DEBUG ('CREATE TABLE IF NOT EXISTS "mymodel" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL)', [])
peewee DEBUG ('SELECT "t1"."app", "t1"."meta", "t1"."modified" FROM "sync_manager" AS "t1" WHERE ("t1"."app" = ?) LIMIT ? OFFSET ?', ['my-sync-service', 1, 0])
peewee DEBUG ('BEGIN', None)
peewee DEBUG ('INSERT INTO "sync_manager" ("app", "meta", "modified") VALUES (?, ?, ?)', ['my-sync-service', '{}', datetime.datetime(2019, 6, 4, 16, 10, 18, 603755)])
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 0, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 609370), 'my-sync-service'])
peewee DEBUG ('CREATE TABLE IF NOT EXISTS "mysyncmodel" ("id" INTEGER NOT NULL PRIMARY KEY, "some_name" VARCHAR(255) NOT NULL)', [])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [0, 'test_0'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [1, 'test_1'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [2, 'test_2'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [3, 'test_3'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [4, 'test_4'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [5, 'test_5'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [6, 'test_6'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [7, 'test_7'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [8, 'test_8'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [9, 'test_9'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [10, 'test_10'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [11, 'test_11'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [12, 'test_12'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [13, 'test_13'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [14, 'test_14'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [15, 'test_15'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [16, 'test_16'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [17, 'test_17'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [18, 'test_18'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [19, 'test_19'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [20, 'test_20'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [21, 'test_21'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [22, 'test_22'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [23, 'test_23'])
peewee DEBUG ('INSERT INTO "mymodel" ("id", "name") VALUES (?, ?)', [24, 'test_24'])
peewee DEBUG ('SELECT COUNT(1) FROM (SELECT 1 FROM "mysyncmodel" AS "t1") AS "_wrapped"', [])
__main__ INFO MySyncModel has 0 records
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [0, 10])
peewee DEBUG ('INSERT INTO "mysyncmodel" ("id", "some_name") VALUES (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?) ON CONFLICT ("id") DO UPDATE SET "some_name" = EXCLUDED."some_name"', [1, 'test_1', 2, 'test_2', 3, 'test_3', 4, 'test_4', 5, 'test_5', 6, 'test_6', 7, 'test_7', 8, 'test_8', 9, 'test_9', 10, 'test_10'])
peewee_syncer DEBUG Processed records n=10 offset=10
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 10, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 837385), 'my-sync-service'])
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [10, 10])
peewee DEBUG ('INSERT INTO "mysyncmodel" ("id", "some_name") VALUES (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?), (?, ?) ON CONFLICT ("id") DO UPDATE SET "some_name" = EXCLUDED."some_name"', [11, 'test_11', 12, 'test_12', 13, 'test_13', 14, 'test_14', 15, 'test_15', 16, 'test_16', 17, 'test_17', 18, 'test_18', 19, 'test_19', 20, 'test_20'])
peewee_syncer DEBUG Processed records n=10 offset=20
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 20, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 855367), 'my-sync-service'])
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [20, 10])
peewee DEBUG ('INSERT INTO "mysyncmodel" ("id", "some_name") VALUES (?, ?), (?, ?), (?, ?), (?, ?) ON CONFLICT ("id") DO UPDATE SET "some_name" = EXCLUDED."some_name"', [21, 'test_21', 22, 'test_22', 23, 'test_23', 24, 'test_24'])
peewee_syncer DEBUG Processed records n=4 offset=24
peewee DEBUG ('UPDATE "sync_manager" SET "meta" = ?, "modified" = ? WHERE ("sync_manager"."app" = ?)', ['{"value": 24, "type": null, "offset": 0}', datetime.datetime(2019, 6, 4, 16, 10, 18, 874314), 'my-sync-service'])
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [24, 10])
peewee_syncer DEBUG Caught up, sleeping..
peewee DEBUG ('SELECT "t1"."id", "t1"."name" FROM "mymodel" AS "t1" WHERE ("t1"."id" > ?) LIMIT ?', [24, 10])
peewee_syncer DEBUG Caught up, sleeping..
peewee_syncer DEBUG Stopping after iteration 5
peewee_syncer INFO Completed processing
peewee DEBUG ('SELECT COUNT(1) FROM (SELECT 1 FROM "mysyncmodel" AS "t1") AS "_wrapped"', [])
__main__ INFO MySyncModel has 24 records


AsyncIO

Uses peewee-async (https://github.com/05bit/peewee-async) Note: SQLite not supported yet: see https://github.com/05bit/peewee-async/issues/126

pip install peewee-syncer[async]

or (includes aiopg)

pip install peewee-syncer[async-pg]

or (includes aiomysql)

pip install peewee-syncer[async-mysql]
from peewee_syncer import get_sync_manager, AsyncProcessor

db_object = Manager(db, loop=None)

def it(since=None, limit=None):

    log.debug("Getting iterator since={} limit={}".format(since, limit))

    def dummy():
        for x in range(since+1, since+limit+1):
            log.debug("yielded {}".format(x))
            yield {"x": x}

    return LastOffsetQueryIterator(dummy(), row_output_fun=lambda x:x, key_fun=lambda x:x['x'], is_unique_key=True)

output = []

async def process(it):
    nonlocal output
    for item in it:
        output.append(item)
        log.debug("process item: {}".format(item))


processor = AsyncProcessor(
    sync_manager=sync_manager,
    it_function=it,
    process_function=process,
    object=db_object
)

async def consume():
    await processor.process(limit=10, i=3)


asyncio.get_event_loop().run_until_complete(consume())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

peewee-syncer-0.2.4.tar.gz (8.5 kB view details)

Uploaded Source

Built Distribution

peewee_syncer-0.2.4-py3-none-any.whl (7.9 kB view details)

Uploaded Python 3

File details

Details for the file peewee-syncer-0.2.4.tar.gz.

File metadata

  • Download URL: peewee-syncer-0.2.4.tar.gz
  • Upload date:
  • Size: 8.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.7.4

File hashes

Hashes for peewee-syncer-0.2.4.tar.gz
Algorithm Hash digest
SHA256 d4055c8bdd08d3d9ef3de6b812cbfea1eec86ce103a0cf8877797b7fd976ebad
MD5 91f57cfa06d2f52d740ca24eacb05a46
BLAKE2b-256 20f6b67e934574b710a3aa1d46cd9b26425e7731cc2e5c1497c95f030726e0e1

See more details on using hashes here.

File details

Details for the file peewee_syncer-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: peewee_syncer-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 7.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.0.1 requests-toolbelt/0.9.1 tqdm/4.32.2 CPython/3.7.4

File hashes

Hashes for peewee_syncer-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 c769a39cb9d83e3b5aaa1899d4672e52d0aebcb88e468605602cac1cdefc1f88
MD5 3cef5a930fcfa313655857c5b6e1e684
BLAKE2b-256 58911f6ad341185c9b2c0ad5db9fe8a5e24c2b1e77c4f81ffb074832cdfa605f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page