Skip to main content

Selenium webdriver using thread pool. (Chrome only now.)

Project description

threadingwebdriver

Selenium headless webdriver using two threadpools.
ThreadPool(1) for control browser.
ThreadPool(custom_number) for read page(ex: get WebElement).
Chrome only.

Initialize

import threadingwebdriver
driver = threadingwebdriver.ChromeWebdriver()
driver.initialize()

Close

Close driver. It will wait tasks of ThreadPools are finish.

driver.close()

Open URL (Async)

url = 'https://www.google.com/'
driver.open_async(url)

Open URL (Sync)

url = 'https://www.google.com/'
is_open:bool = driver.open(3, url)

Get Element (Async)

url = 'https://www.google.com/'
driver.open_async(url)

timeout = 3
body_xpath = '/html/body'
body_xpath_result:WebElementAsyncResult = driver.get_element_xpath_async(timeout, body_xpath)
# code...
body:WebElement = body_xpath_result.get()

Concurrency

import threadingwebdriver
driver = threadingwebdriver.ChromeWebdriver()
driver.initialize(read_thread_count = 3)
timeout = 2
url = "Input Your URL"
driver.open(timeout, url)

p_async = driver.get_element_xpath_async(timeout, '/html/body/div/p')
a_async = driver.get_element_xpath_async(timeout, '/html/body/div/a')
div_async = driver.get_element_xpath_async(timeout, '/html/body/div')

p = p_async.get()
a = a_async.get()
div = div_async.get()

Get Element (Sync)

timeout = 3
body_xpath = '/html/body'
body:WebElement = driver.get_element_xpath(timeout, body_xpath)

Initialize Websocket Listener

async def websocket_listener(listener):
    async for event in listener:
        payload_data = event.response.payload_data
        print(payload_data)

import threadingwebdriver
driver = threadingwebdriver.ChromeWebdriver()
driver.initialize(websocket_listening_function=websocket_listener)

url = 'https:// Input Your URL'
driver.open(3, url)

Exceptions

Based on thread order.

url1 = 'https://www.google.com/'
url2 = 'https://www.github.com/'
driver.open_async(url1)
driver.open_async(url2)
timeout = 3
body_xpath = '/html/body'
body_xpath_result:WebElementAsyncResult = driver.get_element_xpath_async(timeout, body_xpath) 
# Exception: if run 'get_element_xpath_async()' before run 'open_async(url2)'.
url1 = 'https://www.google.com/'
url2 = 'https://www.github.com/'
driver.open_async(url1)
timeout = 3
body_xpath = '/html/body'
body_xpath_result:WebElementAsyncResult = driver.get_element_xpath_async(timeout, body_xpath)
driver.open_async(url2) 
# Exception: run 'open_async(url2)' if not finish 'get_element_xpath_async()'.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threadingwebdriver-0.0.3.tar.gz (7.6 kB view hashes)

Uploaded Source

Built Distribution

threadingwebdriver-0.0.3-py3-none-any.whl (7.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page