TikTok Live Connection Client
Project description
TikTokLive
A python library to connect to and read events from TikTok's LIVE service
A python library to receive and decode livestream events such as comments and gifts in real-time from TikTok's LIVE service by connecting to TikTok's internal WebCast push service. This library includes a wrapper that
connects to the WebCast service using only a user's unique_id
and allows you to join your livestream as well as that of other streamers. No credentials are required to use TikTokLive.
This library a Python implementation of the Javascript TikTok-Live-Connector by @zerodytrash meant to serve as an alternative for users who feel more comfortable working in Python or require it for their specific dependancies.
This is not an official API. It's a reverse engineering and research project.
Join the support discord and visit the #support
channel for questions, contributions and ideas. Feel free to make pull requests with missing/new features, fixes, etc.
🖨 Thermal Printing (20+ SALES & COUNTING!) 🖨 ️
A ready-to-go thermal printing program is for sale. To purchase it, join the support discord and create a ticket, or e-mail info@isaackogan.com
.
Setup troubleshooting will be provided (but ultimately), hosting and setup is up to you and specific to your infrastructure.
Payments are available through PayPal/BTC/ETH and are in USD. Delivery time can vary. Items that have been paid for will be delivered on average within 3-4 hours, at maximum within 24 hours.
Item | Price (USD) | Description |
---|---|---|
Standard | $40 | Print events to a Thermal Printer. For example, printing a comment with the user's name, message, and profile picture. This is a HIGHLY advanced wrapper allowing you to print to a USB printer (must have ESCPOS) like it's nothing. You can also use a Network or Serial printer, if they are ESCPOS, however these are not recommended as they tend to drop connections more frequently than USB. |
Setup Help | $15 | Up to 1 hour of 1-on-1 setup time. I will conference with you in a voice call on Discord and remotely control your PC via AnyDesk. I will install the software on the spot, with you. Must have a working phone with Discord, so I can see the printer in case I need to. |
Getting started
The hyperlinked URL is a Video Tutorial produced by @Vyler on YouTube that makes use of this library. You can also continue reading for a more up-to-date, maintained guide.
- Install the module via pip
pip install TikTokLive
- Create your first chat connection
from TikTokLive import TikTokLiveClient
from TikTokLive.types.events import CommentEvent, ConnectEvent
# Instantiate the client with the user's username
client: TikTokLiveClient = TikTokLiveClient(unique_id="@isaackogz")
# Define how you want to handle specific events via decorator
@client.on("connect")
async def on_connect(_: ConnectEvent):
print("Connected to Room ID:", client.room_id)
# Notice no decorator?
async def on_comment(event: CommentEvent):
print(f"{event.user.nickname} -> {event.comment}")
# Define handling an event via "callback"
client.add_listener("comment", on_comment)
if __name__ == '__main__':
# Run the client and block the main thread
# await client.start() to run non-blocking
client.run()
For more examples, see the examples folder provided in the tree.
Params & Options
To create a new TikTokLiveClient
object the following parameter is required. You can optionally add configuration options to this via kwargs.
TikTokLiveClient(unique_id, **options)
Param Name | Required | Description |
---|---|---|
unique_id | Yes | The unique username of the broadcaster. You can find this name in the URL. Example: https://www.tiktok.com/@officialgeilegisela/live => officialgeilegisela |
debug | No | Whether to fire the "debug" event for receiving raw data |
**options | No | Here you can set the following optional connection properties. If you do not specify a value, the default value will be used.process_initial_data (default: true ) Define if you want to process the initial data which includes old messages of the last seconds. fetch_room_info_on_connect (default: true ) Define if you want to fetch all room information on start. If this option is enabled, the connection to offline rooms will be prevented. If enabled, the connect result contains the room info via the room_info attribute. You can also manually retrieve the room info (even in an unconnected state) using the retrieve_room_info() method.enable_extended_gift_info (default: false ) Define if you want to receive extended information about gifts like gift name, cost and images which you can retrieve via the available_gifts attribute. polling_interval_ms (default: 1000 ) Request polling interval. client_params (default: {} ) Custom client params for Webcast API. headers (default: {} ) Custom request headers passed to aiohttp. timeout_ms (default: 1000 )How long to wait before a request should fail loop (default: None )Optionally supply your own asyncio event loop for usage by the client. When set to None, the client pulls the current active loop or creates a new one. This option is mostly useful for people trying to nest asyncio. |
Example Options:
from TikTokLive import TikTokLiveClient
client: TikTokLiveClient = TikTokLiveClient(
unique_id="@oldskoldj", **(
{
# Whether to process initial data (cached chats, etc.)
"process_initial_data": True,
# Connect info (viewers, stream status, etc.)
"fetch_room_info_on_connect": True,
# Whether to get extended gift info (Image URLs, etc.)
"enable_extended_gift_info": True,
# How frequently to poll Webcast API
"polling_interval_ms": 1000,
# Custom Client params
"client_params": {},
# Custom request headers
"headers": {},
# Custom timeout for Webcast API requests
"timeout_ms": 1000,
# Custom Asyncio event loop
"loop": None,
# Whether to trust environment variables that provide proxies to be used in aiohttp requests
"trust_env": False,
# A ProxyContainer object for proxied requests
"proxy_container": None,
# Set the language for Webcast responses (Changes extended_gift's language)
"lang": "en-US"
}
)
)
client.run()
Methods
A TikTokLiveClient
object contains the following methods.
Method Name | Description |
---|---|
run | Starts a connection to the live chat while blocking the main thread (sync) |
start | Connects to the live chat without blocking the main thread (async) |
stop | Turns off the connection to the live chat. |
retrieve_room_info | Gets the current room info from TikTok API |
retrieve_available_gifts | Retrieves a list of the available gifts for the room and adds it to the extended_gift attribute of the Gift object on the gift event, when enabled. |
add_listener | Adds an asynchronous listener function (or, you can decorate a function with @client.on() ) and takes two parameters, an event name and the payload, an AbstractEvent |
add_proxies | Add proxies to the current list of proxies with a valid aiohttp proxy-url |
get_proxies | Get the current list of proxies by proxy-url |
remove_proxies | Remove proxies from the current list of proxies by proxy-url |
set_proxies_enabled | Set whether or not proxies are enabled (disabled by default) |
Events
A TikTokLiveClient
object has the following events. You can add events either by doing client.add_listener("event_name", callable)
or by decorating a function with @client.on("event_name")
that includes an event
payload parameter.
connect
Triggered when the connection gets successfully established.
@client.on("connect")
async def on_connect(event: ConnectEvent):
print("Connected")
disconnect
Triggered when the connection gets disconnected. You can call start()
to have reconnect . Note that you should wait a little bit before attempting a reconnect to to avoid being rate-limited.
@client.on("disconnect")
async def on_disconnect(event: DisconnectEvent):
print("Disconnected")
like
Triggered every time someone likes the stream.
@client.on("like")
async def on_like(event: LikeEvent):
print("Someone liked the stream!")
join
Triggered every time a new person joins the stream.
@client.on("join")
async def on_join(event: JoinEvent):
print("Someone joined the stream!")
gift
Triggered every time a gift arrives. Extra information can be gleamed off the available_gifts
client attribute.
NOTE: Users have the capability to send gifts in a streak. This increases the
data.gift.repeat_count
value until the user terminates the streak. During this time new gift events are triggered again and again with an increaseddata.gift.repeat_count
value. It should be noted that after the end of the streak, another gift event is triggered, which signals the end of the streak viadata.gift.repeat_end
:1
. This applies only to gifts withdata.gift.gift_type
:1
. This means that even if the user sends agift_type
:1
gift only once, you will receive the event twice. Once withrepeat_end
:0
and once withrepeat_end
:1
. Therefore, the event should be handled as follows in one of TWO ways:
@client.on("gift")
async def on_gift(event: GiftEvent):
# If it's type 1 and the streak is over
if event.gift.gift_type == 1:
if event.gift.repeat_end == 1:
print(f"{event.user.uniqueId} sent {event.gift.repeat_count}x \"{event.gift.extended_gift.name}\"")
# It's not type 1, which means it can't have a streak & is automatically over
elif event.gift.gift_type != 1:
print(f"{event.user.uniqueId} sent \"{event.gift.extended_gift.name}\"")
@client.on("gift")
async def on_gift(event: GiftEvent):
# If it's type 1 and the streak is over
if event.gift.streakable:
if not event.gift.streaking:
print(f"{event.user.uniqueId} sent {event.gift.repeat_count}x \"{event.gift.extended_gift.name}\"")
# It's not type 1, which means it can't have a streak & is automatically over
else:
print(f"{event.user.uniqueId} sent \"{event.gift.extended_gift.name}\"")
follow
Triggered every time someone follows the streamer.
@client.on("follow")
async def on_follow(event: FollowEvent):
print("Someone followed the streamer!")
share
Triggered every time someone shares the stream.
@client.on("share")
async def on_share(event: ShareEvent):
print("Someone shared the streamer!")
viewer_count_update
Triggered every time the viewer count is updated. This event also updates the cached viewer count by default.
@client.on("viewer_count_update")
async def on_connect(event: ViewerCountUpdateEvent):
print("Received a new viewer count:", event.viewCount)
comment
Triggered every time someone comments on the live
@client.on("comment")
async def on_connect(event: CommentEvent):
print(f"{event.user.nickname} -> {event.comment}")
live_end
Triggered when the live stream gets terminated by the host.
@client.on("live_end")
async def on_connect(event: LiveEndEvent):
print(f"Livestream ended :(")
unknown
Triggered when an unknown event is received that is not yet handled by this client
@client.on("live_end")
async def on_connect(event: UnknownEvent):
print(event.as_dict, "<- This is my data as a dict!")
Contributors
- Isaac Kogan - Initial work - isaackogan
- Zerody - Reverse-Engineering & README.md file - Zerody
See the full list of contributors who participated in this project.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.