A production-grade Python library for extracting and processing Telegram group and channel messages
Project description
tgdata
A production-grade Python library for extracting and processing Telegram group and channel messages. Designed for ETL pipelines, data analysis, and archival purposes.
Features
- 🚀 Production-Ready: Built for reliability and scale in ETL pipelines
- 📊 Efficient Data Extraction: Fetch messages from groups and channels with rate limit handling
- 🔄 Incremental Updates: Fetch only new messages with
after_idparameter - 📈 Progress Tracking: Monitor long-running operations with real-time progress
- 🔌 Clean Architecture: Focused on data extraction with minimal dependencies
- 🛡️ Robust Error Handling: Automatic retries with exponential backoff
- 📁 Multiple Export Formats: Export to CSV, JSON, or integrate with your data pipeline
- 🔔 Real-time Updates: Listen for new messages with event handlers
- ⏱️ Polling Support: Poll for new messages at configurable intervals
Installation
pip install tgdata
Quick Start
from tgdata import TgData
import asyncio
async def main():
# Initialize the client
tg = TgData()
# List available groups and channels
groups = await tg.list_groups()
print(f"Found {len(groups)} groups/channels")
# Fetch messages from a specific group
messages = await tg.get_messages(
group_id=-1001234567890, # Your group ID
limit=1000,
with_progress=True
)
# Export to CSV
tg.export_messages(messages, "messages.csv")
# Get message statistics
stats = tg.get_statistics(messages)
print(f"Total messages: {stats['total_messages']}")
print(f"Date range: {stats['date_range']['first']} to {stats['date_range']['last']}")
asyncio.run(main())
Configuration
Create a config.ini file with your Telegram API credentials:
[telegram]
api_id = YOUR_API_ID
api_hash = YOUR_API_HASH
session_file = telegram_session
Get your API credentials from https://my.telegram.org/apps
Advanced Usage
ETL Pipeline Integration
from tgdata import TgData
from datetime import datetime, timedelta
import asyncio
async def etl_pipeline():
tg = TgData()
# Fetch recent messages for ETL processing
yesterday = datetime.now() - timedelta(days=1)
messages = await tg.get_messages(
group_id=-1001234567890,
start_date=yesterday,
with_progress=True
)
# Filter messages
filtered = tg.filter_messages(
messages,
keyword="important",
start_date=yesterday
)
# Export for data pipeline
tg.export_messages(filtered, "daily_extract.json", format="json")
# Get metrics
metrics = await tg.get_metrics()
print(f"Processed {len(filtered)} messages")
asyncio.run(etl_pipeline())
Incremental Message Fetching
from tgdata import TgData
async def incremental_fetch():
tg = TgData()
# Get the latest message ID from your storage
last_processed_id = load_checkpoint() # Your implementation
# Fetch only messages after that ID
new_messages = await tg.get_messages(
group_id=-1001234567890,
after_id=last_processed_id
)
if not new_messages.empty:
# Process new messages
process_messages(new_messages)
# Save new checkpoint
save_checkpoint(new_messages['MessageId'].max())
print(f"New messages: {len(new_messages)}")
asyncio.run(incremental_fetch())
Progress Monitoring
async def monitor_extraction():
tg = TgData()
def progress_callback(current, total, rate):
percent = (current / total * 100) if total else 0
print(f"Progress: {current}/{total} ({percent:.1f}%) - {rate:.1f} msg/s")
messages = await tg.get_messages(
group_id=-1001234567890,
limit=10000,
progress_callback=progress_callback
)
Real-time Message Monitoring
async def monitor_real_time():
tg = TgData()
# Register handler for new messages
@tg.on_new_message(group_id=-1001234567890)
async def handle_message(event):
print(f"New message from {event.sender_id}: {event.message.text}")
# React to commands
if event.message.text == "!ping":
await event.reply("Pong!")
# Run event loop
await tg.run_with_event_loop()
Polling for New Messages
async def poll_messages():
tg = TgData()
# Define callback for new messages
async def process_batch(messages_df):
print(f"Got {len(messages_df)} new messages")
# Process messages here
# Poll every 30 seconds
await tg.poll_for_messages(
group_id=-1001234567890,
interval=30,
callback=process_batch,
max_iterations=10 # Stop after 10 polls
)
API Reference
TgData
Main class for interacting with Telegram groups and channels.
Methods
Core Methods:
list_groups()- List all accessible groups and channelsget_messages()- Fetch messages with various filterssearch_messages()- Search for specific contentfilter_messages()- Filter messages by criteriaexport_messages()- Export to CSV or JSONget_statistics()- Get message statisticsget_metrics()- Get session metricsget_message_count()- Get total message count without fetching all
Real-time & Polling:
on_new_message()- Decorator to register real-time message handlerspoll_for_messages()- Poll for new messages at intervalsrun_with_event_loop()- Run client with event loop for real-time events
Checkpoint Management
For production ETL pipelines, implement your own checkpoint logic:
# Save last processed message ID to your database
last_id = messages['MessageId'].max()
await save_to_database(group_id, last_id)
# On next run, fetch only new messages
last_id = await load_from_database(group_id)
new_messages = await tg.get_messages(group_id=group_id, after_id=last_id)
Production Deployment
Best Practices
-
Use environment variables for credentials:
import os config_path = os.getenv("TELEGRAM_CONFIG", "config.ini") tg = TgData(config_path=config_path)
-
Implement error handling:
try: messages = await tg.get_messages(group_id=group_id) except Exception as e: logger.error(f"Failed to fetch messages: {e}") # Implement retry logic or alerting
-
Monitor rate limits:
health = await tg.health_check() if not health['primary_connection']: # Handle connection issues
Performance Tips
- Use connection pooling for parallel operations
- Implement checkpoint logic for incremental processing
- Implement progress callbacks for visibility
- Export data incrementally for large datasets
Requirements
- Python 3.7+
- Telegram API credentials (not bot tokens)
- Group/channel membership
License
MIT License - see LICENSE file for details
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
- Documentation: Full documentation
- Issues: GitHub Issues
- Examples: See the examples/ directory
Disclaimer
This tool is for legitimate data collection and analysis only. Users must comply with Telegram's Terms of Service and applicable laws. Always respect user privacy and platform rate limits.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tgdata-0.0.0.tar.gz.
File metadata
- Download URL: tgdata-0.0.0.tar.gz
- Upload date:
- Size: 30.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e11b1c2dedbb2af936d92e7bbbee20651a90c61e358ef7ac32cee61f14a28434
|
|
| MD5 |
87dd4f2b7716370f4815b43b51bdb871
|
|
| BLAKE2b-256 |
f26e6c7cec54fdbf200c3edc4afc2614989dc928958fb499f13a61350a657f62
|
File details
Details for the file tgdata-0.0.0-py3-none-any.whl.
File metadata
- Download URL: tgdata-0.0.0-py3-none-any.whl
- Upload date:
- Size: 36.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.9.23
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d0f8b6ac9d42e407e9e2c1a77eac04854311027bf0eae6cb42c2075742f2d13b
|
|
| MD5 |
983535216a8a35b6bc9f79d1d011363d
|
|
| BLAKE2b-256 |
3c94349c3ba3dfdf8bb79fdf5d1dbb6e91eb64cb8e204b8a5309f401fe4d7182
|