Proxy providers for web3py (Web3 python)
Project description
Proxy Providers for Web3Py
A library for connecting to Web3 RPC providers using a proxy
Tested with:
- Python 3.6+
Supports Proxy usage for:
- Http RPC (Sync, Async)
- Websocket RPC (Sync, Async)
- Websocket RPC with subscription (Sync, Async)
Use the following command to install using pip:
pip install web3-proxy-providers
To use the providers, you need web3 with version above 6.0.0, current version is 6.0.0b9 (beta)
pip install web3==6.0.09b
Usage example
Http Provider with Proxy
Use HttpWithProxyProvider
class which supports http and socks proxy
from web3 import Web3
from web3_proxy_providers import HttpWithProxyProvider
provider = HttpWithProxyProvider(
endpoint_uri='https://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
proxy_url='socks5h://localhost:1080'
)
web3 = Web3(
provider=provider,
)
print(web3.eth.block_number)
Async Http Provider with Proxy
Use AsyncHTTPWithProxyProvider
class to connect to an RPC with asyncio using a proxy. both http proxy and socks proxy are supported
import asyncio
from web3 import Web3
from web3.eth import AsyncEth
from python_socks import ProxyType
from web3_proxy_providers import AsyncHTTPWithProxyProvider
async def main():
provider = AsyncHTTPWithProxyProvider(
proxy_type=ProxyType.SOCKS5,
proxy_host='localhost',
proxy_port=1080,
endpoint_uri='https://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
)
web3 = Web3(
provider=provider,
modules={'eth': (AsyncEth,)},
)
print(await web3.eth.block_number)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Async Websocket Provider with Proxy
Use AsyncWebsocketWithProxyProvider
class to connect to a websocket RPC with asyncio using a proxy. both http proxy and socks proxy are supported
import asyncio
from web3 import Web3
from web3.eth import AsyncEth
from python_socks import ProxyType
from web3_proxy_providers import AsyncWebsocketWithProxyProvider
async def main(loop: asyncio.AbstractEventLoop):
provider = AsyncWebsocketWithProxyProvider(
loop=loop,
proxy_type=ProxyType.SOCKS5,
proxy_host='localhost',
proxy_port=1080,
endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
)
web3 = Web3(
provider=provider,
modules={'eth': (AsyncEth,)},
)
print(await web3.eth.block_number)
if __name__ == '__main__':
async_loop = asyncio.get_event_loop()
async_loop.run_until_complete(main(loop=async_loop))
Async Websocket Provider with Proxy with Subscription support
Use AsyncSubscriptionWebsocketWithProxyProvider
class to connect to a websocket RPC with asyncio using a proxy. both http proxy and socks proxy are supported
Learn more about realtime events and eth_subscribe:
import asyncio
from Crypto.Hash import keccak
from web3 import Web3
from python_socks import ProxyType
from web3_proxy_providers import AsyncSubscriptionWebsocketWithProxyProvider
async def callback(subs_id: str, json_result):
print(json_result)
async def main(loop: asyncio.AbstractEventLoop):
provider = AsyncSubscriptionWebsocketWithProxyProvider(
loop=loop,
proxy_type=ProxyType.SOCKS5,
proxy_host='localhost',
proxy_port=1080,
endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
)
# subscribe to Deposit and Withdrawal events for WETH contract
weth_contract_address = Web3.to_checksum_address('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2')
deposit_topic = "0x" + keccak.new(data=b'Deposit(address,uint256)', digest_bits=256).hexdigest()
withdrawal_topic = "0x" + keccak.new(data=b'Withdrawal(address,uint256)', digest_bits=256).hexdigest()
subscription_id = await provider.subscribe(
[
'logs',
{
"address": weth_contract_address,
"topics": [deposit_topic, withdrawal_topic]
}
],
callback
)
print(f'Subscribed with id {subscription_id}')
# unsubscribe after 30 seconds
await asyncio.sleep(30)
await provider.unsubscribe(subscription_id)
if __name__ == '__main__':
async_loop = asyncio.get_event_loop()
async_loop.run_until_complete(main(loop=async_loop))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for web3_proxy_providers-0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 88d0b37d8fd2f0ca3e3c6ba468f50258db7d6041e9b1fb6175130d6ae16d3515 |
|
MD5 | 7f6193fe8a687ab287d0d1dccfc4bba8 |
|
BLAKE2b-256 | 7291d1a78c438bf975e2816be4001fbc78f64f78895e4b47c26da783418bc5e7 |