Skip to main content

Add your description here

Project description

SPARQLx ✨🦋

tests coverage License: GPL v3 Ruff uv

Python library for httpx-based SPARQL Query and Update Operations according to the SPARQL 1.2 Protocol.

WARNING: This project is in an early stage of development and should be used with caution.

Features

  • Async Interface: asyncio support with aquery() and AsyncContextManager API.
  • Query Response Streaming: Streaming iterators for large result sets available with query_stream() and aquery_stream()
  • Synchronous Concurrency Wrapper: Support for concurrent execution of multiple queries from synchronous code with queries()
  • RDFLib Integration: Direct conversion to RDFLib SPARQL result representations
  • Context Managers: Synchronous and asynchronous context managers for lexical resource management
  • Client Sharing: Support for sharing and re-using httpx clients for HTTP connection pooling

Installation

sparqlx is a PEP 621-compliant package and available on PyPI.

pip install sparqlx

Usage

Also see the Recipes section below.

SPARQLWrapper.query

To run a query against an endpoint, instantiate a SPARQLWrapper object and call its query method:

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)

result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")

The default response formats are JSON for SELECT and ASK queries and Turtle for CONSTRUCT and DESCRIBE queries.

SPARQLWrapper.query features a response_format parameter that takes

  • "json", "xml", "csv", "tsv" for SELECT and ASK queries
  • "turtle", "xml", "ntriples", "json-ld" for CONSTRUCT and DESCRIBE queries
  • any other string; the supplied value will be passed as MIME Type to the Accept header.

If the convert parameter is set to True, SPARQLWrapper.query returns

  • a list of Python dictionaries with dict-values cast to RDFLib objects for SELECT queries
  • a Python bool for ASK queries
  • an rdflib.Graph instance for CONSTRUCT and DESCRIBE queries.

Note that only JSON is supported as a response format for convert=True on SELECT and ASK query results.

Converted Result Typing

The return type for calls to SPARQLWrapper.query with convert=True is a union type list[sparqlx.types.SPARQLResultBinding] | bool | rdflib.Graph - the actual runtime return type depends on the query type (SELECT, ASK, CONSTRUCT or DESCRIBE).

Since the query type is not known at static time, type checkers are not able to narrow the union type without explicit annotation.

sparqlx defines typing.overloads for the simple str/sparqlx.types.SPARQLQueryType subclasses

  • sparqlx.SelectQuery,
  • sparqlx.AskQery,
  • sparqlx.ConstructQuery and
  • sparqlx.DescribeQuery

which can be used to inform static checkers about the type of query and ergo allow static analysis to narrow the return type.

from typing import reveal_type
from sparqlx import ConstructQuery, SPARQLWrapper


sparqlwrapper = SPARQLWrapper(sparql_endpoint=wikidata_sparql_endpoint)
query = "construct {?s ?p ?p} where {?s ?p ?o} limit 10"

result_1 = sparqlwrapper.query(query, convert=True)
result_2 = sparqlwrapper.query(ConstructQuery(query), convert=True)

reveal_type(result_1)  # list[_SPARQLBinding] | bool | rdflib.Graph
reveal_type(result_2)  # rdflib.Graph

Note that fully typing SPARQLWrapper.queries is currently not possible since Python does not support variadic type mappings.

Client Sharing and Configuration

By default, SPARQLWrapper creates and manages httpx.Client instances internally.

An httpx.Client can also be supplied by user code; this provides a configuration interface and allows for HTTP connection pooling.

Note that if an httpx.Client is supplied to SPARQLWrapper, user code is responsible for managing (closing) the client.

import httpx
from sparqlx import SPARQLWrapper

client = httpx.Client(timeout=10.0)

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql", client=client
)

result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")

print(client.is_closed)  # False
client.close()
print(client.is_closed)  # True

It is also possible to configure SPARQLWrapper-managed clients by passing a dict holding httpx.Client kwargs to the client_config parameter:

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
	client_config={"timeout": 10.0},
)

result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")

In that case, SPARQLWrapper will internally create and manage httpx.Client instances (the default behavior if no client is provided), but will instantiate clients based on the supplied client_config kwargs.


SPARQLWrapper.aquery

SPARQLWrapper.aquery is an asynchronous version of SPARQLWrapper.query.

import asyncio
from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)

async def run_queries(*queries: str) -> list[httpx.Response]:
	return await asyncio.gather(*[sparql_wrapper.aquery(query) for query in queries])

results: list[httpx.Response] = asyncio.run(
	run_queries(*["select * where {?s ?p ?o} limit 10" for _ in range(10)])
)

For client sharing or configuration of internal client instances, pass an httpx.AsyncClient instance to aclient or kwargs to aclient_config respectively (see SPARQLWrapper.query).


SPARQLWrapper.queries

SPARQLWrapper.queries is a synchronous wrapper around asynchronous code and allows to run multiple queries concurrently from synchronous code.

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)

results: Iterator[httpx.Response] = sparql_wrapper.queries(
	*["select * where {?s ?p ?o} limit 100" for _ in range(10)]
)

Note that since SPARQLWrapper.queries runs async code under the hood, httpx client sharing or configuration requires setting aclient or aclient_config in the respective SPARQLWrapper. Also, SPARQLWrapper.queries creates an event loop and therefore cannot be called from asynchronous code.

If an httpx.AsyncClient is supplied, the client will be closed after the first call to SPARQLWrapper.queries.

User code that wants to run multiple calls to queries can still exert control over the client by using aclient_config. For finer control over concurrent query execution, use the async interface.


Response Streaming

HTTP Responses can be streamed using the SPARQLWrapper.query_stream and SPARQLWrapper.aquery_stream Iterators.

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
)

stream: Iterator[bytes] = sparql_wrapper.query_stream(
	"select * where {?s ?p ?o} limit 10000"
)

astream: AsyncIterator = sparql_wrapper.aquery_stream(
	"select * where {?s ?p ?o} limit 10000"
)

The streaming method and chunk size (for chunked responses) can be controlled with the streaming_method and chunk_size parameters respectively.


Context Managers

SPARQLWrapper also implements the context manager protocol. This can be useful in two ways:

  • Managed Client: Unless an httpx client is passed, SPARQLWrapper creates and manages clients internally. In that case, the context manager uses a single client per context and enables connection pooling within the context.
  • Supplied Client: If an httpx client is passed, SPARQLWrapper will use that client instance and calling code is responsible for client management. In that case, the context manager will manage the supplied client.
from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
)

with sparql_wrapper as context_wrapper:
	result: httpx.Response = context_wrapper.query("select * where {?s ?p ?o} limit 10")
import httpx
from sparqlx import SPARQLWrapper

client = httpx.Client()

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql", client=client
)

with sparql_wrapper as context_wrapper:
	result: httpx.Response = context_wrapper.query("select * where {?s ?p ?o} limit 10")

	print(client.is_closed)  # False
print(client.is_closed)  # True

Update Operations

sparqlx supports Update Operations according to the SPARQL 1.2 Protocol.

The following methods implement SPARQL Update:

  • SPARQLWrapper.update
  • SPARQLWrapper.aupdate
  • SPARQLWrapper.updates

Given an initially empty Triplestore with SPARQL and SPARQL Update endpoints, one could e.g. insert data like so:

import httpx
from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://triplestore/query",
	update_endpoint="https://triplestore/update",
	aclient_config = {
		"auth": httpx.BasicAuth(username="admin", password="supersecret123")
	}
)

with sparql_wrapper as wrapper:
	store_empty: bool = not wrapper.query(
		"ask where {{?s ?p ?o} union {graph ?g {?s ?p ?o}}}", convert=True
	)
	assert store_empty, "Expected store to be empty."

	wrapper.updates(
		"insert data {<urn:s> <urn:p> <urn:o>}",
		"insert data {graph <urn:ng1> {<urn:s> <urn:p> <urn:o>}}",
		"insert data {graph <urn:ng2> {<urn:s> <urn:p> <urn:o>}}",
	)

	result = wrapper.query(
		"select ?g ?s ?p ?o where { {?s ?p ?o} union { graph ?g {?s ?p ?o} }}",
		convert=True,
	)

This will run the specified update operations asynchronously with an internally managed event loop; the query then returns the following Python conversion:

[
	{
		"g": rdflib.term.URIRef("urn:ng2"),
		"s": rdflib.term.URIRef("urn:s"),
		"p": rdflib.term.URIRef("urn:p"),
		"o": rdflib.term.URIRef("urn:o"),
	},
	{
		"g": rdflib.term.URIRef("urn:ng1"),
		"s": rdflib.term.URIRef("urn:s"),
		"p": rdflib.term.URIRef("urn:p"),
		"o": rdflib.term.URIRef("urn:o"),
	},
	{
		"g": None,
		"s": rdflib.term.URIRef("urn:s"),
		"p": rdflib.term.URIRef("urn:p"),
		"o": rdflib.term.URIRef("urn:o"),
	},
]

SPARQL 1.2 Protocol Client Implementation

sparqlx aims to provide a convenient Python interface for interacting with SPARQL endpoints according to the SPARQL 1.2 Protocol.

The SPARQL Protocol provides a specification for HTTP operations targeting SPARQL Query and Update endpoints.

"[The SPARQL 1.2 Protocol] describes a means for conveying SPARQL queries and updates to a SPARQL processing service and returning the results via HTTP to the entity that requested them." (SPARQL 1.2 Protocol - Abstract)

Generally, the SPARQL 1.2 Protocol defines the following HTTP operations for SPARQL operations:

  • GET (query)
  • URL-encoded POST (query and update)
  • POST directly (query and update)

See 2.2 Query Operation and 2.3 Update Operation.

sparqlx uses URL-encoded POST for both Query and Update operations by default.

This allows to send a Request Content Type in the Accept Header and both the Query/Update Request strings and Query/Update Parameters in the Request Message Body.

sparqlx also implements GET (for query operations) and POST-direct (for query and update operations); the SPARQL Operation method can be set via the query_method: typing.Literal["GET", "POST", "POST-direct"] and update_method: typing.Literal["POST", "POST-direct"] parameters in the SPARQLWrapper class.

SPARQL Protocol Request Parameters

The SPARQL Protocol also specifies the following request parameters:

  • version (0 or 1)
  • default-graph-uri (0 or more)
  • named-graph-uri (0 or more)

for Query Operations, where default-graph-uri and named-graph-uri correspond to SPARQL FROM and FROM NAMED respectively, and, if present, take precedence over SPARQL clauses.

  • version (0 or 1)
  • using-graph-uri (0 or more)
  • using-named-graph-uri (0 or more)

for Update Operations, where using-graph-uri and using-named-graph-uri correspond to SPARQL USING and USING NAMED, and likewise take precedence over SPARQL clauses.

SPARQL Protocol request parameters are reflected in the sparqlx API:

  • Methods implementing query operations take default_graph_uri and named_graph_uri parameters.
  • Methods implementing udpate operations take using_graph_uri and using_named_graph_uri parameters.
  • Both query and update methods take a version parameter.

Recipes

The following is a loose collection of sparqlx recipes.

Some of those recipes might become sparqlx features in the future.

JSON Response Streaming

The example below uses ijson to process a sparqlx.SPARQLWrapper.query_stream byte stream.

Note that ijson currently requires an adapter for Iterator input, see issue #58.

from collections.abc import Iterator

import ijson
from sparqlx import SPARQLWrapper


qlever_wikidata_endpoint = "https://qlever.cs.uni-freiburg.de/api//wikidata"
sparql_wrapper = SPARQLWrapper(sparql_endpoint=qlever_wikidata_endpoint)

json_result_stream: Iterator[bytes] = sparql_wrapper.query_stream(
	query="select ?s ?p ?o where {?s ?p ?o} limit 100000"
)

class IJSONIteratorAdapter:
	def __init__(self, byte_stream: Iterator[bytes]):
		self.byte_stream = byte_stream

	def read(self, n):
		if n == 0:
			return b""
		return next(self.byte_stream, b"")

adapter = IJSONIteratorAdapter(byte_stream=json_result_stream)
json_result_iterator: Iterator[dict] = ijson.items(adapter, "results.bindings.item")

print(next(json_result_iterator))

The json_result_iterator generator yields Python dictionaries holding SPARQL JSON response bindings coming from a byte stream. Buffering and incremental parsing is done by ijson.

Graph Response Streaming

The following example processes a stream of RDF graph data coming from a SPARQL CONSTRUCT response.

It uses an Iterator chunking facility ichunk to implement a generator that yields sized sub-graphs from a streamed graph response. To avoid incremental RDF parsing and possibly skolemization, ntriples are requested with line-based streaming.

from collections.abc import Iterator
from itertools import chain, islice
from typing import cast

import httpx
from rdflib import Graph
from sparqlx import SPARQLWrapper


def ichunk[T](iterator: Iterator[T], size: int) -> Iterator[Iterator[T]]:
	_missing = object()
	chunk = islice(iterator, size)

	if (first := next(chunk, _missing)) is _missing:
		return

	yield chain[T]([cast(T, first)], chunk)
	yield from ichunk(iterator, size=size)


releven_sparql_endpoint = "https://graphdb.r11.eu/repositories/RELEVEN"
sparql_wrapper = SPARQLWrapper(sparql_endpoint=releven_sparql_endpoint)

graph_result_stream: Iterator[bytes] = sparql_wrapper.query_stream(
	query="construct {?s ?p ?o} where {?s ?p ?o} limit 100000",
	response_format="ntriples",
	streaming_method=httpx.Response.iter_lines,
)

def graph_result_iterator(size: int = 1000) -> Iterator[Graph]:
	for chunk in ichunk(graph_result_stream, size=size):
		graph = Graph()
		for ntriple in chunk:
			graph.parse(data=ntriple, format="ntriples")

		yield graph

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparqlx-0.7.0.tar.gz (87.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparqlx-0.7.0-py3-none-any.whl (26.9 kB view details)

Uploaded Python 3

File details

Details for the file sparqlx-0.7.0.tar.gz.

File metadata

  • Download URL: sparqlx-0.7.0.tar.gz
  • Upload date:
  • Size: 87.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.22

File hashes

Hashes for sparqlx-0.7.0.tar.gz
Algorithm Hash digest
SHA256 4c68557d90ce0c332790adbea56fa562a3796d2a6ccf8b8125b2507b74fee959
MD5 6a927ccdbc7b4ba149bec4b797a813a8
BLAKE2b-256 531f807d0b991ddf03462d1203d9ca67568bbec7b40f719de22ea7d3ab497a15

See more details on using hashes here.

File details

Details for the file sparqlx-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: sparqlx-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 26.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.22

File hashes

Hashes for sparqlx-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 146060796307703157f51238386be1c3906d0a09ff49b961f14542919dd4c7f3
MD5 ae0d5dc09a563b69a9c31ca44c6d4a47
BLAKE2b-256 8bdc27560ae078b8679f129aacaf79367c2e8d55a34a3d8c5902a66853ea96d8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page