A scalable headless data fetching library written with python and message queue service to enable quickly and easily prasing web data in a distributive way.
Project description
pifetcher
A scalable headless data fetching library written with python and message queue service to enable quickly and easily parsing web data in a distributive way.
To install
pip install pifetcher
PYPI Link https://pypi.org/project/pifetcher/
dependencies:
- selenium
- BeautifulSoup4
- boto3 (optional but by default)
- ChromeDriver for chrome 76(by default)
- Chrome executable v 76(by default)
new feature:
Implemented a new strategy to fetch message in a smarter way. When no message was fetched in a worker cycle, it would enter the idle state. Under the idle state, it's supposed to wait a longer time interval before trying to fetch the next message. This sleep interval is defined in the config file at the location:
"polling_interval_on_idle": 60
After the worker received at least one mssage in a worker cycle, the worker status will be set as ACTIVE. Under this state, it's supposed to wait a shorter time interval before trying to fetch the next message. This sleep interval is defined in the config file at the location:
"polling_interval_on_active": 0.2,
how to use:
-
set up work queue component on the host computer(aws simple queue service by default), such as credentials, regions AWS BOTO3 initial set up docs
-
configure a fetcher by creating a field mapping config file, for example: create a mapping config file for fetching amazon.com item pricing data
{
"price": {
"type": "text",
"selector": "#priceblock_ourprice",
"attribute":".text"
},
"id": {
"type": "text",
"selector": "#ASIN",
"attribute": "value"
},
"title": {
"type": "text",
"selector": "#productTitle",
"attribute":".text"
}
}
- create a pifetcherConfig.json file, and add the fetcher mapping file that previously created to fetcher -> mappingConfigs with its name and file path
num_works_per_time : defines the number of messages it try to fetch from the queue per work cycle polling_interval_on_active : time interval before fetching the next message when the worker status is active(meaning it fetched at least on message in the last worker cycle) polling_interval_on_idle : time interval before fetching the next message when the worker status is active(meaning it fetched no message in the last worker cycle)
"browser":{
"browser_options":["--window-size=1920,1080", "--disable-extensions", "--proxy-server='direct://'", "--proxy-bypass-list=*", "--start-maximized","--ignore-certificate-errors", "--headless"],
"win-driver_path":"chromedriver-win-76.exe",
"win-binary_location": "",
"mac-driver_path":"chromedriver-mac-76",
"mac-binary_location": ""
},
"queue":
{
"num_works_per_time": 1,
"queue_type":"AWSSimpleQueueService",
"queue_name":"datafetch.fifo",
"polling_interval_on_active": 0.2,
"polling_interval_on_idle": 60
},
"logger":
{
"output":"console"
},
"fetcher":
{
"mappingConfigs":{
"amazon":"amazon.json"
}
}
}
- to use the fetcher worker
- import the fetcher worker class and config class
from pifetcher.core import Config
from pifetcher.core import FetchWorker
- load the pifetherConfig.json to the Config class
Config.use('pifetcherConfig.json')
- implement event function with your own logic on_save_result : this will be called when a data object has been successfully parsed on_empty_result_error: this will be called after parsing an empty object, you may want to stop/ pause the process to investigate the problem before continuing parsing on_start_process_signal: this will be called when the worker received a start process signaal , you may implement your logic of adding fetching tasks to the queue here
example:
class TestWorker(FetchWorker):
def on_save_result(self, results):
print(results)
def on_empty_result_error(self):
self.stop()
def on_start_process_signal(self):
work = {}
work['url'] = 'a amazon url'
work['fetcher_name'] = 'amazon'
self.add_works([work])
- Run the worker and, send a StartProcess Signal to the queue to start the process
- start the worker to receive and process works
tw = TestWorker()
tw.do_works()
- to send a start signal to the queue
sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName='datafetch.fifo')
content = {"type":"StartProcess","content":{}}
queue.send_message(MessageBody=json.dumps(content), MessageGroupId = "FetchWork", MessageDeduplicationId = str(time.time()))
Command to exit all chromedriver in windows
taskkill /f /im chromedriver-win-76.exe
To do list items:
- fix browser driver issues
- simplify initial setup process
Completed items
-
- use better strategy to reduce number of requests a worker has to send
- put all constants in config the config file (checked)
- complete the type conversions for different data types (checked)
- add message type (work initiation message type) (checked)
- logging (checked)
- data fetching with use of aws sqs
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pifetcher-0.0.2.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b74a29e8b3c4ed38fad9fc63ecea52281f2081de85e0eedd6775f1f945a4a9fe |
|
MD5 | 2f416d98cfea40a92a37cddb68c24ff2 |
|
BLAKE2b-256 | 17626aaf168f78b57a70f6e6ae26ed3f6e16c9529c8d0c2fc17ec07c9bf8cf20 |