No project description provided
Project description
AirCan
Load data into CKAN DataStore using Airflow as the runner. This is a replacement for DataPusher and Xloader.
Clean separation of components so you can reuse what you want (e.g., you don't use Airflow but your own runner).
- AirCan
Get Started
-
Install Python >= 3.5 <= 3.7.x (and make a virtual environment).
-
Clone
aircan
so you have examples available:git clone https://github.com/datopian/aircan
-
Install and setup Airflow (https://airflow.apache.org/docs/stable/installation.html):
export AIRFLOW_HOME=~/airflow pip install apache-airflow airflow initdb
Note: On recent versions of Python (3.7+), you may face the following error when executing airflow initdb
:
ModuleNotFoundError: No module named 'typing_extensions'
This can be solved with pip install typing_extensions
.
-
Then, start the server and visit your Airflow admin UI:
airflow webserver -p 8080
By default, the server will be accessible at http://localhost:8080/
as shown in the output of the terminal where you ran the previous command.
Examples
Example 1: CSV to JSON
In this example we'll run an AirCan example to convert a CSV to JSON.
Add the DAG to the default directory for Airflow to recognize it:
mkdir ~/airflow/dags/
cp examples/aircan-example-1.csv ~/airflow/
cp examples/csv_to_json.py ~/airflow/dags/
To see this DAG appear in the Airflow admin UI, you may need to restart the server or launch the scheduler to update the list of DAGs (this may take about a minute or two to update, then refresh the page on the Airflow admin UI):
airflow scheduler
Run this DAG:
-
Enable the dag in the admin UI with this toggle to make it run with the scheduler:
-
"Trigger" the DAG with this button:
-
After a moment, check the output. You should see a successful run for this DAG:
-
Locate the output on disk at
~/airflow/aircan-example-1.json
Using Aircan DAGs in a local Airflow instance
Example 2: Local file to CKAN DataStore using the Datastore API
We'll assume you have:
- a local CKAN setup and running at http://localhost:5000;
- a dataset (for example,
my-dataset
) with a resource (for example,my-resource
with themy-res-id-123
as itsresource_id
); - We also need to set up two environment variables for Airflow. Access the Airflow Variable panel and set up
CKAN_SITE_URL
and yourCKAN_SYSADMIN_API_KEY
:
Single-node DAG
api_ckan_load_single_node
is a single-node DAG which deletes,creates and loads a resource to a local or remote CKAN instance. You can run the api_ckan_load_single_node
by following these steps:
- Open your
airflow.cfg
file (usually located at~/airflow/airflow.cfg
) and point your DAG folder to AirCan:
dags_folder = /your/path/to/aircan
...other configs
dag_run_conf_overrides_params = True
Note: do not point dags_folder
to /your/path/to/aircan/aircan/dags
. It must be pointing to the outer aircan
folder.
- Verify that Airflow finds the DAGs of Aircan by running
airflow list_dags
. The output should list:
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ckan_api_load_single_step
...other DAGs...
- Make sure you have these environment variables properly set up:
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
-
Run the Airflow webserver (in case you have skipped the previous example):
airflow webserver
-
Run the Airflow scheduler:
airflow scheduler
. Make sure the environment variables from (3) are set up. -
Access the Airflow UI (
http://localhost:8080/
). You should see the DAGckan_api_load_single_step
listed. -
Activate the DAG by hitting on the Off button on the interface.
-
Now we can test the DAG. On your terminal, run:
airflow test \
-tp "{ \"resource_id\": \"my-res-id-123\", \
\"schema_fields_array\": \"[ 'field1', 'field2']\", \
\"csv_input\": \"/path/to/my.csv\", \
\"json_output\": \"/path/to/my.json\" }" \
ckan_api_load_single_step full_load_via_api now
Make sure to replace the parameters accordingly.
resource_id
is the id of your resource on CKAN.schema_fields_array
is the header of your CSV file. Everything is being treated as plain text at this time.csv_input
is the path to the CSV file you want to upload.- The DAG will convert your CSV file to a JSON file and then upload it.
json_output
specifies the path where you want to dump your JSON file.
-
Check your CKAN instance and verify that the data has been loaded.
-
Trigger the DAG with the following:
airflow trigger_dag ckan_api_load_single_step \
--conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/path/to.csv", "json_output": "/path/to.json" }'
Do not forget to properly replace the parameters with your data and properly escape the special characters.
Alternatively, you can just run the DAG with the airflow run
command.
api_ckan_load_single_node
also works for remote CKAN instances. Just set up your Airflow CKAN_SITE_URL
variable accordingly.
Multiple-node DAG
ckan_api_load_multiple_steps
does the same steps of api_ckan_load_single_node
, but it uses multiple nodes (tasks). You can repeat the steps of the previous section and run ckan_api_load_multiple_steps
.
[Ignore] Example 3: Local file to CKAN DataStore using Postgres
We'll load a local csv into CKAN DataStore instance.
Preliminaries: Setup your CKAN instance
We'll assume you have:
- a local CKAN setup and running at http://localhost:5000
- datastore enabled. If you are using Docker, you might need to expose your Postgres Instance port. For example, add the following in your
docker-composer.yml
file:
db:
ports:
- "5432:5432"
(Useful to know: it is possible to access the Postgres on your Docker container. Run docker ps
and you should see a container named docker-ckan_db
, which corresponds to the CKAN database. Run docker exec -it CONTAINER_ID bash
and then psql -U ckan
to access the corresponding Postgres instance).
- Now you need to set up some information on Airflow. Access your local Airflow Connections panel at http://localhost:8080/admin/connection/. Create a new connection named
ckan_postgres
with your datastore information. For example, assuming yourCKAN_DATASTORE_WRITE_URL=postgresql://ckan:ckan@db/datastore
, use the following schema:
- We also need to set up two environment variables for Airflow. Access the Airflow Variable panel and set up
CKAN_SITE_URL
and yourCKAN_SYSADMIN_API_KEY
:
[TODO PARAMETERIZE VARS] [TODO PARAMETERIZE PATHS]
Then, create a dataset called aircan-example
using this script:
cd aircan
pip install -r requirements-example.txt
python examples/setup-ckan.py --api-key
Doing the load
We assume you now have a dataset named my-first-dataset
.
Create the DAG for loading
cp aircan/lib/api_ckan_load.py ~/airflow/dags/
Check if Airflow recognize your DAG with airflow list_dags
. You should see a DAG named ckan_load
.
Now you can test each task individually:
- To delete a datastore, run
airflow test ckan_load delete_datastore_table now
- To create a datastore, run
airflow test ckan_load create_datastore_table now
. You can see the correspondingresource_id
for the datastore on your logs. [TODO JSON is hardcoded now; parameterize on kwargs or some other operator structure] - To load a CSV to Postgres, run
airflow test ckan_load load_csv_to_postgres_via_copy now
. [TODO JSON is hardcoded now; insert resource_id on it. File path is also Hardcode, change it] - Finally, set your datastore to active:
airflow test ckan_load restore_indexes_and_set_datastore_active now
.
To run the entire DAG:
- Select the DAG [screenshot]
- Configure it with a path to ../your/aircan/examples/example1.csv
- Run it ... [screenshot]
Check the output
- Visit http://localhost:5000/dataset/aircan-example/ and see the resource named XXX. It will have data in its datastore now!
Example 2a: Remote file to DataStore
Same as example 2 but use this DAG instead:
cp aircan/examples/ckan-datastore-from-remote.py ~/airflow/dags/
Plus set a remote URL for loading.
Examples 3: Auto Load file uploaded to CKAN into CKAN DataStore
Configure CKAN to automatically load.
- Setup CKAN - see the previous sections.
- Also install this extension in your ckan instance:
ckanext-aircan-connector
. TODO: add instructions. - Configure ckan with location of your airflow instance and the DAG id (
aircan-load-csv
).
Run it
Run this script which uploads a CSV file to your ckan instance and will trigger a load to the datastore.
cd aircan
pip install -r requirements-example.txt
python examples/ckan-upload-csv.py
Using Google Cloud Composer
- Sign up for an account at https://cloud.google.com/composer. Create or select an existing project at Google Cloud Platform. For this example, we use one called
aircan-test-project
. - Create an environment at Google Cloud Composer, either by command line or by UI. Make sure you select Python 3 when creating the project. Here, we create an environment named
aircan-airflow
.
After creating your environment, it should appear in your environment list:
-
Override the configuration for
dag_run_conf_overrides_params
: -
Access the designated DAGs folder (which will be a bucket). Upload the contents of
local/path/to/aircan/aircan
to the bucket:
The contents of the subfolder aircan
must be:
-
Enter the subdirectory
dags
and delete the__init__.py
file on this folder. It conflicts with Google Cloud Composer configurations. -
Similarly to what we did on Example 2, access your Airflow instance (created by Google Cloud Composer) and add
CKAN_SITE_URL
andCKAN_SYSADMIN_API_KEY
as Variables. Now the DAGs must appear on the UI interface. -
Let's assume you have a resource on
https://demo.ckan.org/
withmy-res-id-123
as its resource_id. We also assume you have, in the root of your DAG bucket on Google Cloud platform, two files: One CSV file with the resource you want to upload, namedr3.csv
, with two columns,field1
andfield2
. The other file you must have in the root of your your bucket isr4.json
, an empty JSON file.
Since our DAGs expect parameters, you'll have to trigger them via CLI:
For example, to trigger api_ckan_load_single_node
, run (from your terminal):
gcloud composer environments run aircan-airflow \
--location us-east1 \
trigger_dag -- ckan_api_load_single_step \
--conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/home/airflow/gcs/dags/r3.csv", "json_output": "/home/airflow/gcs/dags/r4.json" }'
Check the logs (tip: filter them by your DAG ID, for example, ckan_api_load_single_step
). It should updload the data of your .csv
file to demo.ckan
successfully.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aircan-0.0.4.tar.gz
.
File metadata
- Download URL: aircan-0.0.4.tar.gz
- Upload date:
- Size: 15.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.47.0 CPython/3.7.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3db5bd12bb1ac0b7ff6c9f3f3c0243470138f3b5e1fd1079e772bcad829108b3 |
|
MD5 | 729448b3c46d97eb1813d5f212f17583 |
|
BLAKE2b-256 | 172e98579f12bbffa4f660e3305cd433f57757ea5e54e97bdc64d6f1d8bbb81f |
File details
Details for the file aircan-0.0.4-py3-none-any.whl
.
File metadata
- Download URL: aircan-0.0.4-py3-none-any.whl
- Upload date:
- Size: 13.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2 requests-toolbelt/0.9.1 tqdm/4.47.0 CPython/3.7.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7b746e62aabd0cbd9788151a0de81df35df189248e0c580ac94e1c647bc69d90 |
|
MD5 | 3589af0c0f798443be05c9e0f7d74773 |
|
BLAKE2b-256 | 43b7fa4a7867ea85e9fa3a1a1950e1452b9986243119b8cabea52ba57c6b5394 |