Skip to main content

Proxy providers for web3py (Web3 python)

Project description

alt text alt text

Proxy Providers for Web3Py

A library for connecting to Web3 RPC providers using a proxy

Tested with:

  • Python 3.6+

Supports Proxy usage for:

  • Http RPC (Sync, Async)
  • Websocket RPC (Sync, Async)
  • Websocket RPC with subscription (Sync, Async)

Use the following command to install using pip:

pip install web3-proxy-providers

To use the providers, you need web3 with version above 6.0.0, current version is 6.0.0b9 (beta)

pip install web3==6.0.09b

Usage example

Http Provider with Proxy

Use HttpWithProxyProvider class which supports http and socks proxy

from web3 import Web3
from web3_proxy_providers import HttpWithProxyProvider

provider = HttpWithProxyProvider(
    endpoint_uri='https://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
    proxy_url='socks5h://localhost:1080'
)
web3 = Web3(
    provider=provider,
)
print(web3.eth.block_number)

Async Http Provider with Proxy

Use AsyncHTTPWithProxyProvider class to connect to an RPC with asyncio using a proxy. both http proxy and socks proxy are supported

import asyncio
from web3 import Web3
from web3.eth import AsyncEth
from python_socks import ProxyType
from web3_proxy_providers import AsyncHTTPWithProxyProvider

async def main():
    provider = AsyncHTTPWithProxyProvider(
        proxy_type=ProxyType.SOCKS5,
        proxy_host='localhost',
        proxy_port=1080,
        endpoint_uri='https://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
    )
    web3 = Web3(
        provider=provider,
        modules={'eth': (AsyncEth,)},
    )
    print(await web3.eth.block_number)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Async Websocket Provider with Proxy

Use AsyncWebsocketWithProxyProvider class to connect to a websocket RPC with asyncio using a proxy. both http proxy and socks proxy are supported

import asyncio
from web3 import Web3
from web3.eth import AsyncEth
from python_socks import ProxyType
from web3_proxy_providers import AsyncWebsocketWithProxyProvider

async def main(loop: asyncio.AbstractEventLoop):
    provider = AsyncWebsocketWithProxyProvider(
        loop=loop,
        proxy_type=ProxyType.SOCKS5,
        proxy_host='localhost',
        proxy_port=1080,
        endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
    )
    web3 = Web3(
        provider=provider,
        modules={'eth': (AsyncEth,)},
    )
    print(await web3.eth.block_number)

if __name__ == '__main__':
    async_loop = asyncio.get_event_loop()
    async_loop.run_until_complete(main(loop=async_loop))

Async Websocket Provider with Proxy with Subscription support

Use AsyncSubscriptionWebsocketWithProxyProvider class to connect to a websocket RPC with asyncio using a proxy. both http proxy and socks proxy are supported

Learn more about realtime events and eth_subscribe:

import asyncio
from Crypto.Hash import keccak
from web3 import Web3
from python_socks import ProxyType
from web3_proxy_providers import AsyncSubscriptionWebsocketWithProxyProvider

async def callback(subs_id: str, json_result):
    print(json_result)

async def main(loop: asyncio.AbstractEventLoop):
    provider = AsyncSubscriptionWebsocketWithProxyProvider(
        loop=loop,
        proxy_type=ProxyType.SOCKS5,
        proxy_host='localhost',
        proxy_port=1080,
        endpoint_uri='wss://eth-mainnet.g.alchemy.com/v2/<YourAlchemyKey>',
    )
    
    # subscribe to Deposit and Withdrawal events for WETH contract
    weth_contract_address = Web3.to_checksum_address('0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2')
    deposit_topic = "0x" + keccak.new(data=b'Deposit(address,uint256)', digest_bits=256).hexdigest()
    withdrawal_topic = "0x" + keccak.new(data=b'Withdrawal(address,uint256)', digest_bits=256).hexdigest()
    subscription_id = await provider.subscribe(
        [
            'logs',
            {
                "address": weth_contract_address,
                "topics": [deposit_topic, withdrawal_topic]
            }
        ],
        callback
    )
    print(f'Subscribed with id {subscription_id}')
    
    # unsubscribe after 30 seconds
    await asyncio.sleep(30)
    await provider.unsubscribe(subscription_id)

if __name__ == '__main__':
    async_loop = asyncio.get_event_loop()
    async_loop.run_until_complete(main(loop=async_loop))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

web3-proxy-providers-0.3.tar.gz (10.9 kB view hashes)

Uploaded Source

Built Distribution

web3_proxy_providers-0.3-py3-none-any.whl (14.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page