A simple writing utility for writing to QuestDB from PySpark / AWS Glue using Line Protocol and Sockets
Project description
AWS Glue / PySpark QuestDB writer
A simple writing utility for writing to QuestDB from PySpark / AWS Glue
#Introduction A very simple convenience library created due to difficulty in getting custom builds of the AWS Glue Libs for AWS for local development. The standard release is Python 3.6 and requires some changes to be made in order to add in extra libraries.
The InfluxDB writer is a potential alternative to this, but I didn't have much of a chance to get it working due to dependencies and it not easily supporting PySpark.
#Installation
Install this via pip
pip3 install awsglue-questdb-writer
#Usage
In your AWS Glue / PySpark Job include the file via
from awsglue_questdb_writer import *
Usage is by passing a DF to the function, this should ideally be a DF from a SparkSQL output like Glue creates (e.g. from the from_catalog) as that is what has been tested.
Important to note:
- All Timestamps must be datetime objects
- Nanosecond precision (required by QuestDB) is currently only your timestamp precision with added zeros
- If you need real nanosecond permission you must be on Python 3.7 and update the library to use it (See comments)
- QuestDB is whitespace sensitive, all datetimes are quoted but any other fields with whitespace will cause this to fail (silently)
- There is no socket response from this library (it is designed to be unmonitored and high throughput) - if errors are in your input it will fail silently (PR's welcome)
- There is a convenience line to drop unwanted fields prior to passing this into the function to write to QuestDB
args = getResolvedOptions(sys.argv,
['TempDir', 'JOB_NAME', 'db_name', 'temp_workflow_bucket', 'questdb_host', 'questdb_port'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
allDaily = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'],
table_name="daily",
transformation_ctx="allDaily",
)
df = allDaily.toDF()
tdf = df.withColumn('reading_date_time', F.to_timestamp(df['reading_date_time'], '%Y-%m-%dT%H:%M:%S.%f'))
tdf = tdf.drop(*["ingestion_date", "period_start", "period_end", "quality_method",
"event", "import_reactive_total", "export_reactive_total"])
write_to_quest(df=tdf, measurement="meter_id", table="daily", timestamp_field="reading_date_time", args=args)
job.commit()
#License See LICENSE for full details
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file awsglue-questdb-writer-0.0.2.tar.gz
.
File metadata
- Download URL: awsglue-questdb-writer-0.0.2.tar.gz
- Upload date:
- Size: 4.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.6.1 pkginfo/1.7.1 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.61.2 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e8c85575b20a9be37546416d99e6331f7d04477c5d1b921f192439d716b912be |
|
MD5 | d7a8cc7a7ac4b26e2af77f256597e81a |
|
BLAKE2b-256 | 0224c5b9add8d54296a21e9e524840f39c1146ce26921b9da08810847cbd75dc |
File details
Details for the file awsglue_questdb_writer-0.0.2-py3-none-any.whl
.
File metadata
- Download URL: awsglue_questdb_writer-0.0.2-py3-none-any.whl
- Upload date:
- Size: 5.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.6.1 pkginfo/1.7.1 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.61.2 CPython/3.8.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 934514525020ad0c4c5475b811f71b0c1666f5bffeb24ccf897703166c32266e |
|
MD5 | fc8849f4a571b535ae7a2b095ea3b234 |
|
BLAKE2b-256 | 373420b56338e059ed8e2abba54dd9bea94e1d607d26793a1cabe51c3e0708a4 |