Skip to main content

Python library to parse XML data directly from HTTP, HTTPS, or FTP sources without loading the entire file into memory, supporting decompression, encoding detection, tag-based itemization, and optional filtering.

Project description

xmlstreamer

xmlstreamer is a Python library designed for efficient, memory-friendly streaming and parsing of large XML feeds from various sources, including compressed formats. It supports decompression, character encoding detection, tag-based itemization, and optional item filtering, making it ideal for handling real-time XML data feeds, large datasets, and complex structured content.

Features

  • Streamed Data Parsing: Parses XML data directly from HTTP, HTTPS, or FTP sources without loading the entire file into memory.
  • GZIP Decompression Support: Seamlessly handles GZIP-compressed XML feeds for bandwidth efficiency.
  • Encoding Detection: Automatically detects and decodes character encodings, ensuring compatibility with varied XML data formats.
  • Customizable Item Tokenization: Uses SAX-based parsing with user-defined tags to parse only relevant items from XML streams, reducing parsing overhead.
  • Configurable Runtime and Buffering: Includes configurable buffer sizes and runtime limits, allowing you to tailor performance to your application’s needs.
  • Flexible Filtering: Supports custom item filtering based on tag attributes and content, ideal for targeted data extraction.

Example Use Cases

  • Parsing large RSS/Atom feeds in real-time, with item-by-item streaming and filtering.
  • Handling compressed XML datasets for applications in web scraping, data aggregation, and news syndication.
  • Memory-efficient parsing of large or continuous XML data streams without requiring the entire document in memory.

DeepWiki Docs: https://deepwiki.com/carlosplanchon/xmlstreamer

Installation

To install xmlstreamer, use uv:

uv add xmlstreamer

Usage

from xmlstreamer import StreamInterpreter

import pprint

url = "https://example.com/large-feed.xml"
separator_tag = "item"

interpreter = StreamInterpreter(
    url=url,
    separator_tag=separator_tag,
    buffer_size=1024 * 128,
    max_running_time=600  # 10 minutes
)

for item in interpreter:
    pprint.pprint(item)  # Process each parsed item as a dictionary

Define custom filters, encoding mappings, or buffer sizes as needed for optimal performance.

Filtering Usage

To enable item filtering and alerts, create a subclass of xmlstreamer.StreamInterpreter with custom methods for filtering and alerting. Below is an example of setting up an item filter based on date and creating a zero-items alert.

Step 1: Define an ItemFilter

The ItemFilter class specifies which items to keep based on date filtering criteria:

import attrs
from attrs_strict import type_validator
from typing import Optional

@attrs.define
class ItemFilter:
    attrib: str = attrs.field(
        kw_only=True,
        validator=type_validator()
    )
    fmt: Optional[str] = attrs.field(
        validator=type_validator(),
        default=None
    )
    max_item_age_in_days: int = attrs.field(
        kw_only=True,
        validator=type_validator()
    )
  • attrib: The XML tag or attribute to filter by.
  • fmt: Optional date format for parsing dates within the attribute. If not provided, dateparser will be used for parsing.
  • max_item_age_in_days: The maximum allowable age of items in days.

Step 2: Define Helper Functions for Parsing and Filtering Dates

Functions to parse dates and evaluate if an item should be kept based on the specified date limit:

from datetime import datetime, timedelta
from typing import Optional
import dateparser

def parse_date(string: str, fmt: str) -> Optional[datetime]:
    try:
        return datetime.strptime(string, fmt)
    except ValueError:
        return None

def eval_keep_date_item(string: str, fmt: str, limit_date: datetime) -> Optional[bool]:
    string = string.strip()
    parsed_date = parse_date(string, fmt) if fmt else dateparser.parse(string)
    return parsed_date.timestamp() > limit_date.timestamp() if parsed_date else None
  • parse_date: Parses the date string with the specified format.
  • eval_keep_date_item: Checks if the item’s date is within the allowable age limit.

Step 3: Define the Filtering Function

The filter_parsed_item function applies the date filter to each parsed item:

from typing import Optional, Dict, Any

def filter_parsed_item(ITEM_FILTER: ItemFilter, parsed_item: ParsedItem) -> Optional[ParsedItem]:
    attrib = ITEM_FILTER.attrib
    fmt = ITEM_FILTER.fmt
    max_item_age_in_days = ITEM_FILTER.max_item_age_in_days
    limit_date = datetime.now() - timedelta(days=max_item_age_in_days)
    item_content: Dict[str, Any] = parsed_item.parsed_content

    if attrib in item_content:
        dt = item_content[attrib]
        keep_item = eval_keep_date_item(dt, fmt, limit_date) if isinstance(dt, str) else None
        if keep_item is None:
            item_content[attrib] = None
            return parsed_item
        elif keep_item:
            return parsed_item

    return None

Step 4: Extend StreamInterpreter for Filtering and Alerts

Create a subclass that enables filtering with filter_parsed_item and alerts if no items are parsed.

from xmlstreamer import StreamInterpreter
from datetime import datetime
from pathlib import Path
import inspect

class CustomStreamInterpreter(StreamInterpreter):
    def __init__(self, **kwargs):
        kwargs["max_running_time"] = 3600  # Set max runtime to 1 hour
        super().__init__(**kwargs)

        stack = inspect.stack()
        fname = stack[1].filename
        fname_path = Path(fname)
        self.called_from = fname_path.stem

        self.alerts_enabled = True
        self.filter_parsed_item_func = filter_parsed_item

    def raise_stop_iteration(self):
        print(f"XMLSTREAMER STATS ITEMS PARSED: {self.stats_parsed_items}")
        if self.stats_parsed_items == 0:
            self.raise_zero_items_alert()
        raise StopIteration

    def raise_zero_items_alert(self):
        print("--- ZERO ITEMS ALERT ---")
        actual_date = datetime.now()
        running_time = actual_date - self.start_date
        print(f"Running time exceeded with no items parsed: {running_time}")
  • CustomStreamInterpreter: Initializes with filter_parsed_item_func for item filtering.
  • raise_zero_items_alert: Triggered if no items are parsed, printing a warning.

Step 5: Run with Custom Filtering

To use filtering and alerting with your subclass:

url = "https://example.com/large-feed.xml"
separator_tag = "item"
item_filter = ItemFilter(
    attrib="pubDate",
    fmt="%a, %d %b %Y %H:%M:%S %z",
    max_item_age_in_days=7
)

interpreter = CustomStreamInterpreter(
    url=url,
    separator_tag=separator_tag,
    item_filter=item_filter,
    buffer_size=1024 * 128,
)

for item in interpreter:
    print(item)  # Process each filtered item as a dictionary

This example demonstrates setting a pubDate filter that removes items older than 7 days. The CustomStreamInterpreter will also trigger an alert if no items are parsed within the set runtime.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

xmlstreamer-1.0.1.tar.gz (10.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

xmlstreamer-1.0.1-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file xmlstreamer-1.0.1.tar.gz.

File metadata

  • Download URL: xmlstreamer-1.0.1.tar.gz
  • Upload date:
  • Size: 10.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.4

File hashes

Hashes for xmlstreamer-1.0.1.tar.gz
Algorithm Hash digest
SHA256 1b9d17b0de1c29688a4af64bf98834a8a69710d78d0bd511cae46c538aa09b89
MD5 400f0144770360655e6811c1acd11bd4
BLAKE2b-256 289fa9f2fc3a9dfc2bf1784604ea1eef7d30c45af7b7a054efeaf11e43247e7f

See more details on using hashes here.

File details

Details for the file xmlstreamer-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for xmlstreamer-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 76665ffe63fc199abf10cf2405283c70a05a0a05637366ad0ba0356956f33b42
MD5 2ed8ece2b49b5125363726f856ad5c53
BLAKE2b-256 7b5c2e8d39cacdf5571d48481ed3911e7d13dd516ba6d69c581dca5fc7ad873c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page