Skip to main content

AgeFreighter is a Python package that helps you to create a graph database using Azure Database for PostgreSQL.

Project description

AGEFreighter

a Python package that helps you to create a graph database using Azure Database for PostgreSQL.

Apache AGE™ is a PostgreSQL Graph database compatible with PostgreSQL's distributed assets and leverages graph data structures to analyze and use relationships and patterns in data.

Azure Database for PostgreSQL is a managed database service that is based on the open-source Postgres database engine.

Introducing support for Graph data in Azure Database for PostgreSQL (Preview).

Features

  • Asynchronous connection pool support for psycopg PostgreSQL driver
  • 'direct_load' option for loading data directly into the graph for better performance
  • 'COPY' protocol support for loading data into the graph for much better performance

Install

pip install agefreighter

Prerequisites

  • over Python 3.11
  • This module runs on psycopg and psycopg_pool
  • Enable the Apache AGE extension in your Azure Database for PostgreSQL instance. Login Azure Portal, go to 'server parameters' blade, and check 'AGE" on within 'azure.extensions' and 'shared_preload_libraries' parameters. See, above blog post for more information.
  • Load the AGE extension in your PostgreSQL database.
CREATE EXTENSION IF NOT EXISTS age CASCADE;

Usage

import os

import asyncio
from agefreighter import AgeFreighter

# file downloaded from https://www.kaggle.com/datasets/darinhawley/imdb-films-by-actor-for-10k-actors
# actorfilms.csv: Actor,ActorID,Film,Year,Votes,Rating,FilmID
# # of actors: 9,623, # of films: 44,456, # of edges: 191,873
async def test_loadFromSingleCSV(af: AgeFreighter, chunk_size: int = 96, direct_loading: bool = False) -> None:
    await af.loadFromSingleCSV(
        graph_name="actorfilms",
        csv="actorfilms.csv",
        start_vertex_type="Actor",
        start_id="ActorID",
        start_properties=["Actor"],
        edge_label="ACTED_IN",
        end_vertex_type="Film",
        end_id="FilmID",
        end_properties=["Film", "Year", "Votes", "Rating"],
        chunk_size=chunk_size,
        direct_loading = direct_loading,
        drop_graph = True
    )

# cities.csv: id,name,state_id,state_code,country_id,country_code,latitude,longitude
# continents.csv: id,name,iso3,iso2,numeric_code,phone_code,capital,currency,currency_symbol,tld,native,region,subregion,latitude,longitude,emoji,emojiU
# edges.csv: start_id,start_vertex_type,end_id,end_vertex_type
# # of countries: 53, # of cities: 72,485, # of edges: 72,485
async def test_loadFromCSVs(af: AgeFreighter, chunk_size: int = 96, direct_loading: bool = False) -> None:
    await af.loadFromCSVs(
        graph_name="cities_countries",
        vertex_csvs=["countries.csv", "cities.csv"],
        vertex_labels=["Country", "City"],
        edge_csvs=["edges.csv"],
        edge_labels=["has_city"],
        chunk_size=chunk_size,
        direct_loading = direct_loading,
        drop_graph = True
    )

async def test_copyFromSingleCSV(af: AgeFreighter, chunk_size: int = 96) -> None:
    await af.loadFromSingleCSV(
        graph_name="actorfilms",
        csv="actorfilms.csv",
        start_vertex_type="Actor",
        start_id="ActorID",
        start_properties=["Actor"],
        edge_label="ACTED_IN",
        end_vertex_type="Film",
        end_id="FilmID",
        end_properties=["Film", "Year", "Votes", "Rating"],
        chunk_size=chunk_size,
        drop_graph=True,
        use_copy=True,
    )

async def test_copyFromCSVs(af: AgeFreighter, chunk_size: int = 96) -> None:
    await af.loadFromCSVs(
        graph_name="cities_countries",
        vertex_csvs=["countries.csv", "cities.csv"],
        vertex_labels=["Country", "City"],
        edge_csvs=["edges.csv"],
        edge_labels=["has_city"],
        chunk_size=chunk_size,
        drop_graph=True,
        use_copy=True,
    )

# test for copyfromNetworkx
async def test_copyFromNetworkx(af: AgeFreighter, chunk_size: int = 96) -> None:
    df = pd.read_csv("actorfilms.csv")
    G = nx.DiGraph()

    for name, group in df.groupby("ActorID"):
        for idx, row in group.iterrows():
            G.add_node(row["ActorID"], label="Actor", name=row["Actor"])
            G.add_node(
                row["FilmID"],
                label="Film",
                name=row["Film"],
                year=row["Year"],
                votes=row["Votes"],
                rating=row["Rating"],
            )
            G.add_edge(row["ActorID"], row["FilmID"], label="ACTED_IN")

    start_time = time.time()
    await af.loadFromNetworkx(
        graph_name="actorfilms",
        networkx_graph=G,
        chunk_size=chunk_size,
        direct_loading=False,
        drop_graph=True,
        use_copy=True,
    )
    print(
        f"test_copyFromNetworkx : time, {time.time() - start_time:.2f}, chunk_size: {chunk_size}"
    )

async def main() -> None:
    # export PG_CONNECTION_STRING="host=your_server.postgres.database.azure.com port=5432 dbname=postgres user=account password=your_password"
    try:
        connection_string = os.environ["PG_CONNECTION_STRING"]
    except KeyError:
        print("Please set the environment variable PG_CONNECTION_STRING")
        return

    af = await AgeFreighter.connect(dsn = connection_string, max_connections = 64)
    try:
        # Strongly reccomended to define chunk_size with your data and server before loading large amount of data
        # Especially, the number of properties in the vertex affects the complecity of the query
        # Due to asynchronous nature of the library, the duration for loading data is not linear to the number of rows
        #
        # Addition to the chunk_size, max_wal_size and checkpoint_timeout in the postgresql.conf should be considered
        chunk_size = 64
        await test_loadFromSingleCSV(af, chunk_size = chunk_size, direct_loading = False)
        await asyncio.sleep(10)
        await test_loadFromSingleCSV(af, chunk_size = chunk_size, direct_loading = True)
        await asyncio.sleep(10)
        await test_copyFromSingleCSV(af, chunk_size = chunk_size)
        await asyncio.sleep(10)

        await test_loadFromCSVs(af, chunk_size = chunk_size, direct_loading = False)
        await asyncio.sleep(10)
        await test_loadFromCSVs(af, chunk_size = chunk_size, direct_loading = True)
        await asyncio.sleep(10)
        await test_copyFromCSVs(af, chunk_size = chunk_size)
        await asyncio.sleep(10)

        await test_copyFromNetworkx(af, chunk_size=chunk_size)

    finally:
        await af.pool.close()

if __name__ == "__main__":
    asyncio.run(main())

Test & Samples

export PG_CONNECTION_STRING="host=your_server.postgres.database.azure.com port=5432 dbname=postgres user=account password=your_password"
python3 tests/test_agefreighter.py

For more information about Apache AGE

License

MIT License

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agefreighter-0.1.4.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

agefreighter-0.1.4-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file agefreighter-0.1.4.tar.gz.

File metadata

  • Download URL: agefreighter-0.1.4.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.1

File hashes

Hashes for agefreighter-0.1.4.tar.gz
Algorithm Hash digest
SHA256 8bcae985e42bc6b20d007d4c79c415798d0c6987622a1ba49fdf75e590b303a5
MD5 2cf0b133708b6f2f794999a96b262d74
BLAKE2b-256 e5d7458f74273d211036cd4a845bfa7ce86c61bf715cb239ad1b1f0d40de4deb

See more details on using hashes here.

File details

Details for the file agefreighter-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for agefreighter-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 95d23fd8c7bf782ca882da7c63fbbb97c9363602dedaba94481a82a9984f7231
MD5 42b4dd8dc37ffa94bf6f386618bf80b5
BLAKE2b-256 a8d8ce040281419e29fc97c2ceeaf444ce7d21918642c77c7310eaa082d1e116

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page