Skip to main content

A scalable headless data fetching library written with python and message queue service to enable quickly and easily prasing web data in a distributive way.

Project description

pifetcher

A scalable headless data fetching library written with python and message queue service to enable quickly and easily parsing web data in a distributive way.

To install

pip install pifetcher

PYPI Link https://pypi.org/project/pifetcher/

dependencies:

  • selenium
  • BeautifulSoup4
  • boto3 (optional but by default)
  • ChromeDriver for chrome 76(by default)
  • Chrome executable v 76(by default)

features:

  • event-callback-based interaction between user defined logic and the pre-disigned fetch worker
  • process works in batches, library user will be able to capture the event of a batch of works have been finished
  • easy to use, only needs to inherit the FetchWorker class and implement the basic call back functions
  • it's design to use message queue, enbles more than just one worker to perform data fetching in order to scale the application

how to use:

  1. set up work queue component on the host computer(aws simple queue service by default), such as credentials, regions AWS BOTO3 initial set up docs

  2. configure a fetcher by creating a field mapping config file, for example: create a mapping config file for fetching amazon.com item pricing data

{
    "price": {
        "type": "text",
        "selector": "#priceblock_ourprice",
        "attribute":".text"
    },
    "id": {
        "type": "text",
        "selector": "#ASIN",
        "attribute": "value"
    },
    "title": {
        "type": "text",
        "selector": "#productTitle",
        "attribute":".text"
    }
}
  1. create a pifetcherConfig.json file, and add the fetcher mapping file that previously created to fetcher -> mappingConfigs with its name and file path

numWorksPerTime : defines the number of messages it try to fetch from the queue per work cycle pollingIntervalOnActive : time interval before fetching the next message when the worker status is active(meaning it fetched at least on message in the last worker cycle) pollingIntervalOnIdle : time interval before fetching the next message when the worker status is active(meaning it fetched no message in the last worker cycle)

{
    "browser":{
        "browser_options":["--window-size=1920,1080", "--disable-extensions", "--proxy-server='direct://'", "--proxy-bypass-list=*", "--start-maximized","--ignore-certificate-errors", "--headless"],
        "win-driver_path":"chromedriver-win-76.exe",
        "win-binary_location": "",
        "mac-driver_path":"chromedriver-mac-76",
        "mac-binary_location": ""

    },
    "queue":
    {
        "numWorksPerTime": 1,
        "queueType":"AWSSimpleQueueService",
        "queueName":"datafetch.fifo",
        "pollingIntervalOnActive": 0.2,
        "pollingIntervalOnIdle": 60
    },
    "logger":
    {
        "output":"console"
    },
    "fetcher":
    {
        "mappingConfigs":{
            "amazon":"amazon.json"
        }

    }
}
  1. to use the fetcher worker
  • import the fetcher worker class and config class
from pifetcher.core import Config
from pifetcher.core import FetchWorker
  • load the pifetherConfig.json to the Config class
Config.use('pifetcherConfig.json')
  • implement event function with your own logic on_save_result : this will be called when a data object has been successfully parsed on_empty_result_error: this will be called after parsing an empty object, you may want to stop/ pause the process to investigate the problem before continuing parsing on_batch_start: this will be called when the worker received a batch start signal , you may implement your logic of adding fetching tasks to the queue here on_batch_finish: this will be called when the worker received a batch finish signal example:
    def on_save_result(self, result, batch_id, work):
        print(result, batch_id, work)
    def on_empty_result_error(self):
        self.stop()
    def on_batch_start(self, batch_id):
        work = {}
        work['url'] = 'a amazon url'
        work['fetcherName'] = 'amazon'
        self.add_works([work])
    def on_batch_finish(self, batch_id):
        print(f"all works with the batchId {batch_id} have been processed")
  1. Run the worker and, send a StartProcess Signal to the queue to start the process
  • start the worker to receive and process works
tw = TestWorker()
tw.do_works()
  • to send a start signal to the queue If you want to send out the start signal from one of the worker, you can call this function
tw.send_start_signal()

But if you want to start the batch process from another system, you can use the code below

    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName='datafetch.fifo')
    content = {"type":"BatchStart","batchId": str(uuid.uuid4()),"content":{}}
    queue.send_message(MessageBody=json.dumps(content), MessageGroupId = "FetchWork", MessageDeduplicationId = str(time.time()).replace(".",""))

Command to exit all chromedriver in windows

taskkill /f /im chromedriver-win-76.exe

How to optimized the number of polls the worker has to send to the queue

When no message was fetched in a worker cycle, it would enter the idle state. Under the idle state, it's supposed to wait a longer time interval before trying to fetch the next message. This sleep interval is defined in the config file at the location:

        "pollingIntervalOnIdle": 60

After the worker received at least one mssage in a worker cycle, the worker status will be set as ACTIVE. Under this state, it's supposed to wait a shorter time interval before trying to fetch the next message. This sleep interval is defined in the config file at the location:

        "pollingIntervalOnActive": 0.2,

To do list items:

  • fix browser driver issues
  • simplify initial setup process

Completed items:

  • use better strategy to reduce number of requests a worker has to send
  • put all constants in config the config file (checked)
  • complete the type conversions for different data types (checked)
  • add message type (work initiation message type) (checked)
  • logging (checked)
  • data fetching with use of aws sqs

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pifetcher-0.0.3.0.tar.gz (21.7 MB view details)

Uploaded Source

Built Distribution

pifetcher-0.0.3.0-py3-none-any.whl (21.8 MB view details)

Uploaded Python 3

File details

Details for the file pifetcher-0.0.3.0.tar.gz.

File metadata

  • Download URL: pifetcher-0.0.3.0.tar.gz
  • Upload date:
  • Size: 21.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.4

File hashes

Hashes for pifetcher-0.0.3.0.tar.gz
Algorithm Hash digest
SHA256 ab445a7889a25d96de470a4d9c09e588134236b8808e5dc4e81f137d92259ee8
MD5 43712db4c8f152af7a52ca17079b07db
BLAKE2b-256 9ea4fe15bc96f0bbaf478609903a2b7707412ceef71ede2b3aacb7f81bbfa574

See more details on using hashes here.

File details

Details for the file pifetcher-0.0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pifetcher-0.0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 21.8 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.4

File hashes

Hashes for pifetcher-0.0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ce7a6dbc9d5883de2367a60e36d7d4b02cb42afab8e22e33a41f0a1e038a9756
MD5 6a9587bfa5004e63e6f9dab864d6680b
BLAKE2b-256 ed44401153647df10b7c1e5f176530800e1400019b63b4919bf78ac34a4b35b4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page