Skip to main content

A scalable headless data fetching library written with python and message queue service to enable quickly and easily prasing web data in a distributive way.

Project description

pifetcher

A scalable headless data fetching library written with python and message queue service to enable quickly and easily parsing web data in a distributive way.

To install

pip install pifetcher

PYPI Link https://pypi.org/project/pifetcher/

dependencies:

  • selenium
  • BeautifulSoup4
  • boto3 (optional but by default)
  • ChromeDriver for chrome 76(by default)
  • Chrome executable v 76(by default)

features:

  • event-callback-based interaction between user defined logic and the pre-disigned fetch worker
  • process works in batches, library user will be able to capture the event of a batch of works have been finished
  • easy to use, only needs to inherit the FetchWorker class and implement the basic call back functions
  • it's design to use message queue, enbles more than just one worker to perform data fetching in order to scale the application

how to use:

  1. set up work queue component on the host computer(aws simple queue service by default), such as credentials, regions AWS BOTO3 initial set up docs

  2. configure a fetcher by creating a field mapping config file, for example: create a mapping config file for fetching amazon.com item pricing data

{
    "price": {
        "type": "text",
        "selector": "#priceblock_ourprice",
        "attribute":".text"
    },
    "id": {
        "type": "text",
        "selector": "#ASIN",
        "attribute": "value"
    },
    "title": {
        "type": "text",
        "selector": "#productTitle",
        "attribute":".text"
    }
}
  1. create a pifetcherConfig.json file, and add the fetcher mapping file that previously created to fetcher -> mappingConfigs with its name and file path

numWorksPerTime : defines the number of messages it try to fetch from the queue per work cycle pollingIntervalOnActive : time interval before fetching the next message when the worker status is active(meaning it fetched at least on message in the last worker cycle) pollingIntervalOnIdle : time interval before fetching the next message when the worker status is active(meaning it fetched no message in the last worker cycle)

{
    "browser":{
        "browser_options":["--window-size=1920,1080", "--disable-extensions", "--proxy-server='direct://'", "--proxy-bypass-list=*", "--start-maximized","--ignore-certificate-errors", "--headless"],
        "win-driver_path":"chromedriver-win-76.exe",
        "win-binary_location": "",
        "mac-driver_path":"chromedriver-mac-76",
        "mac-binary_location": ""

    },
    "queue":
    {
        "numWorksPerTime": 1,
        "queueType":"AWSSimpleQueueService",
        "queueName":"datafetch.fifo",
        "pollingIntervalOnActive": 0.2,
        "pollingIntervalOnIdle": 60
    },
    "logger":
    {
        "output":"console"
    },
    "fetcher":
    {
        "mappingConfigs":{
            "amazon":"amazon.json"
        }

    }
}
  1. to use the fetcher worker
  • import the fetcher worker class and config class
from pifetcher.core import Config
from pifetcher.core import FetchWorker
  • load the pifetherConfig.json to the Config class
Config.use('pifetcherConfig.json')
  • implement event function with your own logic on_save_result : this will be called when a data object has been successfully parsed on_empty_result_error: this will be called after parsing an empty object, you may want to stop/ pause the process to investigate the problem before continuing parsing on_batch_start: this will be called when the worker received a batch start signal , you may implement your logic of adding fetching tasks to the queue here on_batch_finish: this will be called when the worker received a batch finish signal example:
    def on_save_result(self, result, batch_id, work):
        print(result, batch_id, work)
    def on_empty_result_error(self):
        self.stop()
    def on_batch_start(self, batch_id):
        work = {}
        work['url'] = 'a amazon url'
        work['fetcherName'] = 'amazon'
        self.add_works([work])
    def on_batch_finish(self, batch_id):
        print(f"all works with the batchId {batch_id} have been processed")
  1. Run the worker and, send a StartProcess Signal to the queue to start the process
  • start the worker to receive and process works
tw = TestWorker()
tw.do_works()
  • to send a start signal to the queue If you want to send out the start signal from one of the worker, you can call this function
tw.send_start_signal()

But if you want to start the batch process from another system, you can use the code below

    sqs = boto3.resource('sqs')
    queue = sqs.get_queue_by_name(QueueName='datafetch.fifo')
    content = {"type":"BatchStart","batchId": str(uuid.uuid4()),"content":{}}
    queue.send_message(MessageBody=json.dumps(content), MessageGroupId = "FetchWork", MessageDeduplicationId = str(time.time()).replace(".",""))

Command to exit all chromedriver in windows

taskkill /f /im chromedriver-win-76.exe

How to optimized the number of polls the worker has to send to the queue

When no message was fetched in a worker cycle, it would enter the idle state. Under the idle state, it's supposed to wait a longer time interval before trying to fetch the next message. This sleep interval is defined in the config file at the location:

        "pollingIntervalOnIdle": 60

After the worker received at least one mssage in a worker cycle, the worker status will be set as ACTIVE. Under this state, it's supposed to wait a shorter time interval before trying to fetch the next message. This sleep interval is defined in the config file at the location:

        "pollingIntervalOnActive": 0.2,

To do list items:

  • fix browser driver issues
  • simplify initial setup process

Completed items:

  • use better strategy to reduce number of requests a worker has to send
  • put all constants in config the config file (checked)
  • complete the type conversions for different data types (checked)
  • add message type (work initiation message type) (checked)
  • logging (checked)
  • data fetching with use of aws sqs

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pifetcher-0.0.3.3.tar.gz (21.7 MB view details)

Uploaded Source

Built Distribution

pifetcher-0.0.3.3-py3-none-any.whl (21.8 MB view details)

Uploaded Python 3

File details

Details for the file pifetcher-0.0.3.3.tar.gz.

File metadata

  • Download URL: pifetcher-0.0.3.3.tar.gz
  • Upload date:
  • Size: 21.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.4

File hashes

Hashes for pifetcher-0.0.3.3.tar.gz
Algorithm Hash digest
SHA256 d4c75ed7206c813e5227abe993e8b5a783cf221b423462d196f623a6780a0552
MD5 ba61f624838cfa9d5dd49b42b3a8ccf7
BLAKE2b-256 c5328568c8fc1be2a0011734d37bc3364088d67001b01c7662ffc1dcc840aa1c

See more details on using hashes here.

File details

Details for the file pifetcher-0.0.3.3-py3-none-any.whl.

File metadata

  • Download URL: pifetcher-0.0.3.3-py3-none-any.whl
  • Upload date:
  • Size: 21.8 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.4

File hashes

Hashes for pifetcher-0.0.3.3-py3-none-any.whl
Algorithm Hash digest
SHA256 960e81d304e496e570cf5a87c642a02aef11e5082fd11b6f53fec40669d7b678
MD5 dcd2a2c73dc3a3aba04b51608be50fdd
BLAKE2b-256 e1eab03ae89d9939d5cb1be0bcf14122d69ad74b4b228131bebc99aeaf64440b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page