Skip to main content

ETL tools

Project description

中文介绍

Introduction

The goal of spparser is to provide a concise and efficient way to read, write, and process text data. At the same time, it supports synchronous and asynchronous reading and writing files, and supports regular, xpath, css selector to extract data. In the future, read and write support for the database will be implemented, and NLP will be introduced to provide more flexible processing methods. The architecture diagram is as follows:
jiagou

The AsyncReader and AsyncWriter is inspired by @zpoint's idataapi_transform

Installation

pip3 install spparser

Quick Start

from spparser import Reader, Writer, Extractor

def main():
    data = Reader.read_csv(file_path="./example.csv", each_line_type="dict", max_read_lines=10)
    '''
    example.csv:
    field1,field2
    1,2
    3,4
    5,6
    '''
    '''
    read_csv result: data = [{'a': '122github', 'b': '2'}, {'a': '-8spparser999', 'b': '4'}]
    '''
    alist = []
    for item in data:
        res = Extractor.regex(r"[a-zA-Z]+", item["a"], flags=0, trim_mode=True, return_all=False)
        alist.append(res)
    '''
    alist = ["github","spparser"]
    '''
    Writer.write(alist, "result.json")

if __name__ == "__main__":
    main()

Use Extractor.xpath() to extract html text

from spparser import Reader, Writer, Extractor

def main():
    '''
    demo.html
    <html lang="en">
    <head>
        <title>spparser</title>
    </head>
    <body>
        <ul id="container">
            <li class="object-1" tag="1"/>
            <li class="object-2"/>
            <li class="object-3"/>
        </ul>
    </body>
    </html>
    '''
    '''
    read_csv result: data = [{'a': '122github', 'b': '2'}, {'a': '-8spparser999', 'b': '4'}]
    '''
    html_text = Reader.read_anyfile("demo.html",line_by_line=False)
    res = Extractor.xpath("//title/text()",html_text)
    print(res)

if __name__ == "__main__":
    main()

Reading files asynchronously

from spparser import Reader,Writer, AsyncReader, AsyncWriter
import asyncio

async def main():
    reader = AsyncReader.async_csv_reader("./src.csv",batch_size=10,each_line_type="dict",max_read_lines=100, debug=True)
    with AsyncWriter.async_csv_writer("./dest.csv") as writer:
        async for items in reader:
            #for item in items:
                # Parser process
            await writer.write(items)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

When debug is set to True, output logs:

[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
[2020-07-17  14:54:04] AsyncReader.py[line:70] INFO: from source: ./src.csv, this batch get 10 lines
[2020-07-17  14:54:04] AsyncWriter.py[line:63] INFO: to destination: ./dest.csv, write 10 lines.
...

For mongodb asynchronous read and write:

async def main():
    reader = AsyncReader.async_mongo_reader(query={},collection="src_col", host="my_address",port=27017, database="my_db",username="my_name", password="my_pwd", batch_size=100,max_read_lines=1000)
    with AsyncWriter.async_mongo_writer(collection="dest_col", host="my_address",port=27017, database="my_db",username="my_name", password="my_pwd") as writer:
        async for items in getter:
            await writer.write(items)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Version 0.4.10 added support for MySQL asynchronous read and write

async def main():
    sql = "CREATE TABLE IF NOT EXISTS TARGET_TABLE (field1 type1, field2 type2) DEFAULT CHARSET=utf8;"
    getter = AsyncReader.async_mysql_reader(query_sql="SELECT * FROM SRC_TABLE",host="localhost", port=None, database="test", username="username", password="password",batch_size=100,max_read_lines=1000)
    with AsyncWriter.async_mysql_writer(create_table_sql=sql,host="localhost", port=None, database="test", username="username", password="password") as writer:
        async for items in getter:
            await writer.write(items)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

History

0.2.10

  • async_anyfile_reader, async_anyfile_writer, async_csv_reader, async_csv_writer support.
  • xpath, css, regex selectors in Extractor support.

0.3.30

  • async_mongo_reader, async_mongo_writer support

0.4.10

  • async_mysql_reader, async_mysql_writer support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

spparser-0.5.10.tar.gz (11.4 kB view hashes)

Uploaded Source

Built Distribution

spparser-0.5.10-py3-none-any.whl (13.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page