Skip to main content

NebulaGraph Data Intelligence Suite

Project description

NebulaGraph Data Intelligence(ngdi) Suite

image

pdm-managed License Python PyPI version

NebulaGraph Data Intelligence Suite for Python (ngdi) is a Python library for NebulaGraph Data Intelligence.

ngdi provides a set of APIs for data scientists to read, write and analyze/compute data in NebulaGraph, on single(NetworkX for now) machine or distributed(Spark for now) cluster.

        ┌───────────────────────────────────────────────────┐            
        │   Spark Cluster                                   │            
        │    .─────.    .─────.    .─────.    .─────.       │            
     ┌─▶│   :       ;  :       ;  :       ;  :       ;      │            
     │  │     `───'      `───'      `───'      `───'        │            
Algorithm                                                   │            
  Spark └───────────────────────────────────────────────────┘            
 Engine ┌───────────────────────────────────────────────────────────────┐
     └──┤                                                               │
        │   NebulaGraph Data Intelligence Suite(ngdi)                   │
        │     ┌────────┐    ┌──────┐    ┌────────┐   ┌─────┐            │
        │     │ Reader │    │ Algo │    │ Writer │   │ GNN │            │
        │     └────────┘    └──────┘    └────────┘   └─────┘            │
        │          ├────────────┴───┬────────┴─────┐    └──────┐        │
        │          ▼                ▼              ▼           ▼        │
        │   ┌─────────────┐ ┌──────────────┐ ┌──────────┐┌──────────┐   │
     ┌──┤   │ SparkEngine │ │ NebulaEngine │ │ NetworkX ││ DGLEngine│   │
     │  │   └─────────────┘ └──────────────┘ └──────────┘└──────────┘   │
     │  └──────────┬────────────────────────────────────────────────────┘
     │             │        Spark                                        
     │             └────────Reader ────────────┐                         
Spark Reader              Query Mode           │                         
Scan Mode                                      ▼                         
     │  ┌───────────────────────────────────────────────────┐            
     │  │  NebulaGraph Graph Engine         Nebula-GraphD   │            
     │  ├──────────────────────────────┬────────────────────┤            
     │  │  NebulaGraph Storage Engine  │                    │            
     └─▶│  Nebula-StorageD             │    Nebula-Metad    │            
        └──────────────────────────────┴────────────────────┘            

Installation

pip install ngdi

Spark Engine Prerequisites

NebulaGraph Engine Prerequisites

Run on PySpark Jupyter Notebook(Spark Engine)

Assuming we have put the nebula-spark-connector.jar and nebula-algo.jar in /opt/nebulagraph/ngdi/package/.

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=0.0.0.0 --port=8888 --no-browser"

pyspark --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
    --driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \
    --jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
    --jars /opt/nebulagraph/ngdi/package/nebula-algo.jar

Then we could access Jupyter Notebook with PySpark and refer to examples/spark_engine.ipynb

Submit Algorithm job to Spark Cluster(Spark Engine)

Assuming we have put the nebula-spark-connector.jar and nebula-algo.jar in /opt/nebulagraph/ngdi/package/; We have put the ngdi-py3-env.zip in /opt/nebulagraph/ngdi/package/. And we have the following Algorithm job in pagerank.py:

from ngdi import NebulaGraphConfig
from ngdi import NebulaReader

# set NebulaGraph config
config_dict = {
    "graphd_hosts": "graphd:9669",
    "metad_hosts": "metad0:9669,metad1:9669,metad2:9669",
    "user": "root",
    "password": "nebula",
    "space": "basketballplayer",
}
config = NebulaGraphConfig(**config_dict)

# read data with spark engine, query mode
reader = NebulaReader(engine="spark")
query = """
    MATCH ()-[e:follow]->()
    RETURN e LIMIT 100000
"""
reader.query(query=query, edge="follow", props="degree")
df = reader.read()

# run pagerank algorithm
pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10)

Note, this could be done by Airflow, or other job scheduler in production.

Then we can submit the job to Spark cluster:

spark-submit --master spark://master:7077 \
    --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
    --driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \
    --jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \
    --jars /opt/nebulagraph/ngdi/package/nebula-algo.jar \
    --py-files /opt/nebulagraph/ngdi/package/ngdi-py3-env.zip \
    pagerank.py

Run ngdi algorithm job from python script(Spark Engine)

We have everything ready as above, including the pagerank.py.

import subprocess

subprocess.run(["spark-submit", "--master", "spark://master:7077",
                "--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar",
                "--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-algo.jar",
                "--jars", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar",
                "--jars", "/opt/nebulagraph/ngdi/package/nebula-algo.jar",
                "--py-files", "/opt/nebulagraph/ngdi/package/ngdi-py3-env.zip",
                "pagerank.py"])

Run on single machine(NebulaGraph Engine)

Assuming we have NebulaGraph cluster up and running, and we have the following Algorithm job in pagerank_nebula_engine.py:

This file is the same as pagerank.py except for the following line:

- reader = NebulaReader(engine="spark")
+ reader = NebulaReader(engine="nebula")

Then we can run the job on single machine:

python3 pagerank.py

Documentation

API Reference

Usage

Spark Engine Examples

See also: examples/spark_engine.ipynb

from ngdi import NebulaReader

# read data with spark engine, query mode
reader = NebulaReader(engine="spark")
query = """
    MATCH ()-[e:follow]->()
    RETURN e LIMIT 100000
"""
reader.query(query=query, edge="follow", props="degree")
df = reader.read() # this will take some time
df.show(10)

# read data with spark engine, scan mode
reader = NebulaReader(engine="spark")
reader.scan(edge="follow", props="degree")
df = reader.read() # this will take some time
df.show(10)

# read data with spark engine, load mode (not yet implemented)
reader = NebulaReader(engine="spark")
reader.load(source="hdfs://path/to/edge.csv", format="csv", header=True, schema="src: string, dst: string, rank: int")
df = reader.read() # this will take some time
df.show(10)

# run pagerank algorithm
pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10) # this will take some time

# convert dataframe to NebulaGraphObject
graph = reader.to_graphx() # not yet implemented

NebulaGraph Engine Examples(not yet implemented)

from ngdi import NebulaReader

# read data with nebula engine, query mode
reader = NebulaReader(engine="nebula")
reader.query("""
    MATCH ()-[e:follow]->()
    RETURN e.src, e.dst, e.degree LIMIT 100000
""")
df = reader.read() # this will take some time
df.show(10)

# read data with nebula engine, scan mode
reader = NebulaReader(engine="nebula")
reader.scan(edge_types=["follow"])
df = reader.read() # this will take some time
df.show(10)

# convert dataframe to NebulaGraphObject
graph = reader.to_graph() # this will take some time
graph.nodes.show(10)
graph.edges.show(10)

# run pagerank algorithm
pr_result = graph.algo.pagerank(reset_prob=0.15, max_iter=10) # this will take some time

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ngdi-0.1.7.tar.gz (14.4 kB view details)

Uploaded Source

Built Distribution

ngdi-0.1.7-py3-none-any.whl (15.2 kB view details)

Uploaded Python 3

File details

Details for the file ngdi-0.1.7.tar.gz.

File metadata

  • Download URL: ngdi-0.1.7.tar.gz
  • Upload date:
  • Size: 14.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.4.6 CPython/3.8.10

File hashes

Hashes for ngdi-0.1.7.tar.gz
Algorithm Hash digest
SHA256 11f1127a84e797f9a73f5be8bfe0e0d95f450d237816dce2bce9516f07d91073
MD5 b65921fef5a2602796fd9cce159d2500
BLAKE2b-256 45c4453944cf6fad99fda4df603080e8515996fddfd3a52c4be1b8bac29a28c0

See more details on using hashes here.

File details

Details for the file ngdi-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: ngdi-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.4.6 CPython/3.8.10

File hashes

Hashes for ngdi-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 f4fcc6e7d22765d7af14f0b9c479c93006bee3a1cd896cb4ac1659a274d634c0
MD5 f5b62cd7fd9b231dd22fc1fec1e43717
BLAKE2b-256 92a6ef17112c8c96c61099a3bb2a6165ac7d5328e8f5526867eb7f5310056cfe

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page