Skip to main content

Add your description here

Project description

SPARQLx ✨🦋

tests coverage License: GPL v3 Ruff uv

Python library for httpx-based SPARQL Query and Update Operations according to the SPARQL 1.2 Protocol.

WARNING: This project is in an early stage of development and should be used with caution.

Features

  • Async Interface: asyncio support with aquery() and AsyncContextManager API.
  • Query Response Streaming: Streaming iterators for large result sets available with query_stream() and aquery_stream()
  • Synchronous Concurrency Wrapper: Support for concurrent execution of multiple queries from synchronous code with queries()
  • RDFLib Integration: Direct conversion to RDFLib SPARQL result representations
  • Context Managers: Synchronous and asynchronous context managers for lexical resource management
  • Client Sharing: Support for sharing and re-using httpx clients for HTTP connection pooling

Installation

sparqlx is a PEP 621-compliant package and available on PyPI.

pip install sparqlx

Usage

Also see the Recipes section below.

SPARQLWrapper.query

To run a query against an endpoint, instantiate a SPARQLWrapper object and call its query method:

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)

result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")

The default response formats are JSON for SELECT and ASK queries and Turtle for CONSTRUCT and DESCRIBE queries.

SPARQLWrapper.query features a response_format parameter that takes

  • "json", "xml", "csv", "tsv" for SELECT and ASK queries
  • "turtle", "xml", "ntriples", "json-ld" for CONSTRUCT and DESCRIBE queries
  • any other string; the supplied value will be passed as MIME Type to the Accept header.

If the convert parameter is set to True, SPARQLWrapper.query returns

  • a list of Python dictionaries with dict-values cast to RDFLib objects for SELECT queries
  • a Python bool for ASK queries
  • an rdflib.Graph instance for CONSTRUCT and DESCRIBE queries.

Note that only JSON is supported as a response format for convert=True on SELECT and ASK query results.

Converted Result Typing

The return type for calls to SPARQLWrapper.query with convert=True is a union type list[sparqlx.types.SPARQLResultBinding] | bool | rdflib.Graph - the actual runtime return type depends on the query type (SELECT, ASK, CONSTRUCT or DESCRIBE).

Since the query type is not known at static time, type checkers are not able to narrow the union type without explicit annotation.

sparqlx defines typing.overloads for the simple str/sparqlx.types.SPARQLQueryType subclasses

  • sparqlx.SelectQuery,
  • sparqlx.AskQery,
  • sparqlx.ConstructQuery and
  • sparqlx.DescribeQuery

which can be used to inform static checkers about the type of query and ergo allow static analysis to narrow the return type.

from typing import reveal_type
from sparqlx import ConstructQuery, SPARQLWrapper


sparqlwrapper = SPARQLWrapper(sparql_endpoint=wikidata_sparql_endpoint)
query = "construct {?s ?p ?p} where {?s ?p ?o} limit 10"

result_1 = sparqlwrapper.query(query, convert=True)
result_2 = sparqlwrapper.query(ConstructQuery(query), convert=True)

reveal_type(result_1)  # list[_SPARQLBinding] | bool | rdflib.Graph
reveal_type(result_2)  # rdflib.Graph

Note that fully typing SPARQLWrapper.queries is currently not possible since Python does not support variadic type mappings.

Client Sharing and Configuration

By default, SPARQLWrapper creates and manages httpx.Client instances internally.

An httpx.Client can also be supplied by user code; this provides a configuration interface and allows for HTTP connection pooling.

Note that if an httpx.Client is supplied to SPARQLWrapper, user code is responsible for managing (closing) the client.

import httpx
from sparqlx import SPARQLWrapper

client = httpx.Client(timeout=10.0)

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql", client=client
)

result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")

print(client.is_closed)  # False
client.close()
print(client.is_closed)  # True

It is also possible to configure SPARQLWrapper-managed clients by passing a dict holding httpx.Client kwargs to the client_config parameter:

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
	client_config={"timeout": 10.0},
)

result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")

In that case, SPARQLWrapper will internally create and manage httpx.Client instances (the default behavior if no client is provided), but will instantiate clients based on the supplied client_config kwargs.


SPARQLWrapper.aquery

SPARQLWrapper.aquery is an asynchronous version of SPARQLWrapper.query.

import asyncio
from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)

async def run_queries(*queries: str) -> list[httpx.Response]:
	return await asyncio.gather(*[sparql_wrapper.aquery(query) for query in queries])

results: list[httpx.Response] = asyncio.run(
	run_queries(*["select * where {?s ?p ?o} limit 10" for _ in range(10)])
)

For client sharing or configuration of internal client instances, pass an httpx.AsyncClient instance to aclient or kwargs to aclient_config respectively (see SPARQLWrapper.query).


SPARQLWrapper.queries

SPARQLWrapper.queries is a synchronous wrapper around asynchronous code and allows to run multiple queries concurrently from synchronous code.

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)

results: Iterator[httpx.Response] = sparql_wrapper.queries(
	*["select * where {?s ?p ?o} limit 100" for _ in range(10)]
)

Note that since SPARQLWrapper.queries runs async code under the hood, httpx client sharing or configuration requires setting aclient or aclient_config in the respective SPARQLWrapper. Also, SPARQLWrapper.queries creates an event loop and therefore cannot be called from asynchronous code.

If an httpx.AsyncClient is supplied, the client will be closed after the first call to SPARQLWrapper.queries.

User code that wants to run multiple calls to queries can still exert control over the client by using aclient_config. For finer control over concurrent query execution, use the async interface.


Response Streaming

HTTP Responses can be streamed using the SPARQLWrapper.query_stream and SPARQLWrapper.aquery_stream Iterators.

from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
)

stream: Iterator[bytes] = sparql_wrapper.query_stream(
	"select * where {?s ?p ?o} limit 10000"
)

astream: AsyncIterator = sparql_wrapper.aquery_stream(
	"select * where {?s ?p ?o} limit 10000"
)

The streaming method and chunk size (for chunked responses) can be controlled with the streaming_method and chunk_size parameters respectively.


Context Managers

SPARQLWrapper also implements the context manager protocol. This can be useful in two ways:

  • Managed Client: Unless an httpx client is passed, SPARQLWrapper creates and manages clients internally. In that case, the context manager uses a single client per context and enables connection pooling within the context.
  • Supplied Client: If an httpx client is passed, SPARQLWrapper will use that client instance and calling code is responsible for client management. In that case, the context manager will manage the supplied client.
from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
)

with sparql_wrapper as context_wrapper:
	result: httpx.Response = context_wrapper.query("select * where {?s ?p ?o} limit 10")
import httpx
from sparqlx import SPARQLWrapper

client = httpx.Client()

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql", client=client
)

with sparql_wrapper as context_wrapper:
	result: httpx.Response = context_wrapper.query("select * where {?s ?p ?o} limit 10")

	print(client.is_closed)  # False
print(client.is_closed)  # True

Update Operations

sparqlx supports Update Operations according to the SPARQL 1.2 Protocol.

The following methods implement SPARQL Update:

  • SPARQLWrapper.update
  • SPARQLWrapper.aupdate
  • SPARQLWrapper.updates

Given an initially empty Triplestore with SPARQL and SPARQL Update endpoints, one could e.g. insert data like so:

import httpx
from sparqlx import SPARQLWrapper

sparql_wrapper = SPARQLWrapper(
	sparql_endpoint="https://triplestore/query",
	update_endpoint="https://triplestore/update",
	aclient_config = {
		"auth": httpx.BasicAuth(username="admin", password="supersecret123")
	}
)

with sparql_wrapper as wrapper:
	store_empty: bool = not wrapper.query(
		"ask where {{?s ?p ?o} union {graph ?g {?s ?p ?o}}}", convert=True
	)
	assert store_empty, "Expected store to be empty."

	wrapper.updates(
		"insert data {<urn:s> <urn:p> <urn:o>}",
		"insert data {graph <urn:ng1> {<urn:s> <urn:p> <urn:o>}}",
		"insert data {graph <urn:ng2> {<urn:s> <urn:p> <urn:o>}}",
	)

	result = wrapper.query(
		"select ?g ?s ?p ?o where { {?s ?p ?o} union { graph ?g {?s ?p ?o} }}",
		convert=True,
	)

This will run the specified update operations asynchronously with an internally managed event loop; the query then returns the following Python conversion:

[
	{
		"g": rdflib.term.URIRef("urn:ng2"),
		"s": rdflib.term.URIRef("urn:s"),
		"p": rdflib.term.URIRef("urn:p"),
		"o": rdflib.term.URIRef("urn:o"),
	},
	{
		"g": rdflib.term.URIRef("urn:ng1"),
		"s": rdflib.term.URIRef("urn:s"),
		"p": rdflib.term.URIRef("urn:p"),
		"o": rdflib.term.URIRef("urn:o"),
	},
	{
		"g": None,
		"s": rdflib.term.URIRef("urn:s"),
		"p": rdflib.term.URIRef("urn:p"),
		"o": rdflib.term.URIRef("urn:o"),
	},
]

SPARQL 1.2 Protocol Client Implementation

sparqlx aims to provide a convenient Python interface for interacting with SPARQL endpoints according to the SPARQL 1.2 Protocol.

The SPARQL Protocol provides a specification for HTTP operations targeting SPARQL Query and Update endpoints.

"[The SPARQL 1.2 Protocol] describes a means for conveying SPARQL queries and updates to a SPARQL processing service and returning the results via HTTP to the entity that requested them." (SPARQL 1.2 Protocol - Abstract)

Generally, the SPARQL 1.2 Protocol defines the following HTTP operations for SPARQL operations:

  • GET (query)
  • URL-encoded POST (query and update)
  • POST directly (query and update)

See 2.2 Query Operation and 2.3 Update Operation.

sparqlx implements URL-encoded POST for both Query and Update operations.

This allows to send a Request Content Type in the Accept Header and both the Query/Update Request strings and Query Parameters in the Request Message Body.

SPARQL Protocol Request Parameters

The SPARQL Protocol also specifies the following request parameters:

  • version (0 or 1)
  • default-graph-uri (0 or more)
  • named-graph-uri (0 or more)

for Query Operations, where default-graph-uri and named-graph-uri correspond to SPARQL FROM and FROM NAMED respectively, and, if present, take precedence over SPARQL clauses.

  • version (0 or 1)
  • using-graph-uri (0 or more)
  • using-named-graph-uri (0 or more)

for Update Operations, where using-graph-uri and using-named-graph-uri correspond to SPARQL USING and USING NAMED, and likewise take precedence over SPARQL clauses.

SPARQL Protocol request parameters are reflected in the sparqlx API:

  • Methods implementing query operations take default_graph_uri and named_graph_uri parameters.
  • Methods implementing udpate operations take using_graph_uri and using_named_graph_uri parameters.
  • Both query and update methods take a version parameter.

Recipes

The following is a loose collection of sparqlx recipes.

Some of those recipes might become sparqlx features in the future.

JSON Response Streaming

The example below uses ijson to process a sparqlx.SPARQLWrapper.query_stream byte stream.

Note that ijson currently requires an adapter for Iterator input, see issue #58.

from collections.abc import Iterator

import ijson
from sparqlx import SPARQLWrapper


qlever_wikidata_endpoint = "https://qlever.cs.uni-freiburg.de/api//wikidata"
sparql_wrapper = SPARQLWrapper(sparql_endpoint=qlever_wikidata_endpoint)

json_result_stream: Iterator[bytes] = sparql_wrapper.query_stream(
	query="select ?s ?p ?o where {?s ?p ?o} limit 100000"
)

class IJSONIteratorAdapter:
	def __init__(self, byte_stream: Iterator[bytes]):
		self.byte_stream = byte_stream

	def read(self, n):
		if n == 0:
			return b""
		return next(self.byte_stream, b"")

adapter = IJSONIteratorAdapter(byte_stream=json_result_stream)
json_result_iterator: Iterator[dict] = ijson.items(adapter, "results.bindings.item")

print(next(json_result_iterator))

The json_result_iterator generator yields Python dictionaries holding SPARQL JSON response bindings coming from a byte stream. Buffering and incremental parsing is done by ijson.

Graph Response Streaming

The following example processes a stream of RDF graph data coming from a SPARQL CONSTRUCT response.

It uses an Iterator chunking facility ichunk to implement a generator that yields sized sub-graphs from a streamed graph response. To avoid incremental RDF parsing and possibly skolemization, ntriples are requested with line-based streaming.

from collections.abc import Iterator
from itertools import chain, islice
from typing import cast

import httpx
from rdflib import Graph
from sparqlx import SPARQLWrapper


def ichunk[T](iterator: Iterator[T], size: int) -> Iterator[Iterator[T]]:
	_missing = object()
	chunk = islice(iterator, size)

	if (first := next(chunk, _missing)) is _missing:
		return

	yield chain[T]([cast(T, first)], chunk)
	yield from ichunk(iterator, size=size)


releven_sparql_endpoint = "https://graphdb.r11.eu/repositories/RELEVEN"
sparql_wrapper = SPARQLWrapper(sparql_endpoint=releven_sparql_endpoint)

graph_result_stream: Iterator[bytes] = sparql_wrapper.query_stream(
	query="construct {?s ?p ?o} where {?s ?p ?o} limit 100000",
	response_format="ntriples",
	streaming_method=httpx.Response.iter_lines,
)

def graph_result_iterator(size: int = 1000) -> Iterator[Graph]:
	for chunk in ichunk(graph_result_stream, size=size):
		graph = Graph()
		for ntriple in chunk:
			graph.parse(data=ntriple, format="ntriples")

		yield graph

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sparqlx-0.6.0.tar.gz (86.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sparqlx-0.6.0-py3-none-any.whl (26.4 kB view details)

Uploaded Python 3

File details

Details for the file sparqlx-0.6.0.tar.gz.

File metadata

  • Download URL: sparqlx-0.6.0.tar.gz
  • Upload date:
  • Size: 86.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.22

File hashes

Hashes for sparqlx-0.6.0.tar.gz
Algorithm Hash digest
SHA256 f8031c0c872ac1ce61caa6e71e243d77121823038d257455335068e6c0403f6f
MD5 437a429bb6093a6505a9a8740bb1cc17
BLAKE2b-256 1e3819ca90fbd303f7d24dca42c1555a3b267f5d94a73e22280e637140086fca

See more details on using hashes here.

File details

Details for the file sparqlx-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: sparqlx-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 26.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.22

File hashes

Hashes for sparqlx-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 04d887b53e25ad5901be50d50f90a32504f0202cf8b619366ea3af2f8e9d2981
MD5 1c860d391f3c3f6746a9561e8780bad5
BLAKE2b-256 0bda6c8b3f38a34be4b4591cd59de9cecf9bb3f1011f192396460daac58480e5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page