No project description provided
Project description
AirCan
Load data into CKAN DataStore using Airflow as the runner. This is a replacement for DataPusher and Xloader.
Clean separation of components so you can reuse what you want (e.g., you don't use Airflow but your own runner).
- AirCan
Get Started
-
Install Python >= 3.5 <= 3.7.x (and make a virtual environment).
-
Clone
aircan
so you have examples available:git clone https://github.com/datopian/aircan
-
Install and setup Airflow (https://airflow.apache.org/docs/stable/installation.html):
export AIRFLOW_HOME=~/airflow pip install apache-airflow airflow initdb
Note: On recent versions of Python (3.7+), you may face the following error when executing airflow initdb
:
ModuleNotFoundError: No module named 'typing_extensions'
This can be solved with pip install typing_extensions
.
-
Then, start the server and visit your Airflow admin UI:
airflow webserver -p 8080
By default, the server will be accessible at http://localhost:8080/
as shown in the output of the terminal where you ran the previous command.
Examples
Example 1: CSV to JSON
In this example we'll run an AirCan example to convert a CSV to JSON.
Add the DAG to the default directory for Airflow to recognize it:
mkdir ~/airflow/dags/
cp examples/aircan-example-1.csv ~/airflow/
cp examples/csv_to_json.py ~/airflow/dags/
To see this DAG appear in the Airflow admin UI, you may need to restart the server or launch the scheduler to update the list of DAGs (this may take about a minute or two to update, then refresh the page on the Airflow admin UI):
airflow scheduler
Run this DAG:
-
Enable the dag in the admin UI with this toggle to make it run with the scheduler:
-
"Trigger" the DAG with this button:
-
After a moment, check the output. You should see a successful run for this DAG:
-
Locate the output on disk at
~/airflow/aircan-example-1.json
Using Aircan DAGs in a local Airflow instance
Example 2: Local file to CKAN DataStore using the Datastore API
We'll assume you have:
- a local CKAN setup and running at http://localhost:5000;
- a dataset (for example,
my-dataset
) with a resource (for example,my-resource
with themy-res-id-123
as itsresource_id
); - We also need to set up two environment variables for Airflow. Access the Airflow Variable panel and set up
CKAN_SITE_URL
and yourCKAN_SYSADMIN_API_KEY
:
Single-node DAG
api_ckan_load_single_node
is a single-node DAG which deletes,creates and loads a resource to a local or remote CKAN instance. You can run the api_ckan_load_single_node
by following these steps:
- Open your
airflow.cfg
file (usually located at~/airflow/airflow.cfg
) and point your DAG folder to AirCan:
dags_folder = /your/path/to/aircan
...other configs
dag_run_conf_overrides_params = True
Note: do not point dags_folder
to /your/path/to/aircan/aircan/dags
. It must be pointing to the outer aircan
folder.
- Verify that Airflow finds the DAGs of Aircan by running
airflow list_dags
. The output should list:
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ckan_api_load_single_step
...other DAGs...
- Make sure you have these environment variables properly set up:
export LC_ALL=en_US.UTF-8
export LANG=en_US.UTF-8
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
-
Run the Airflow webserver (in case you have skipped the previous example):
airflow webserver
-
Run the Airflow scheduler:
airflow scheduler
. Make sure the environment variables from (3) are set up. -
Access the Airflow UI (
http://localhost:8080/
). You should see the DAGckan_api_load_single_step
listed. -
Activate the DAG by hitting on the Off button on the interface.
-
Now we can test the DAG. On your terminal, run:
airflow test \
-tp "{ \"resource_id\": \"my-res-id-123\", \
\"schema_fields_array\": \"[ 'field1', 'field2']\", \
\"csv_input\": \"/path/to/my.csv\", \
\"json_output\": \"/path/to/my.json\" }" \
ckan_api_load_single_step full_load_via_api now
Make sure to replace the parameters accordingly.
resource_id
is the id of your resource on CKAN.schema_fields_array
is the header of your CSV file. Everything is being treated as plain text at this time.csv_input
is the path to the CSV file you want to upload.- The DAG will convert your CSV file to a JSON file and then upload it.
json_output
specifies the path where you want to dump your JSON file.
-
Check your CKAN instance and verify that the data has been loaded.
-
Trigger the DAG with the following:
airflow trigger_dag ckan_api_load_single_step \
--conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/path/to.csv", "json_output": "/path/to.json" }'
Do not forget to properly replace the parameters with your data and properly escape the special characters.
Alternatively, you can just run the DAG with the airflow run
command.
api_ckan_load_single_node
also works for remote CKAN instances. Just set up your Airflow CKAN_SITE_URL
variable accordingly.
Multiple-node DAG
ckan_api_load_multiple_steps
does the same steps of api_ckan_load_single_node
, but it uses multiple nodes (tasks). You can repeat the steps of the previous section and run ckan_api_load_multiple_steps
.
[Ignore] Example 3: Local file to CKAN DataStore using Postgres
We'll load a local csv into CKAN DataStore instance.
Preliminaries: Setup your CKAN instance
We'll assume you have:
- a local CKAN setup and running at http://localhost:5000
- datastore enabled. If you are using Docker, you might need to expose your Postgres Instance port. For example, add the following in your
docker-composer.yml
file:
db:
ports:
- "5432:5432"
(Useful to know: it is possible to access the Postgres on your Docker container. Run docker ps
and you should see a container named docker-ckan_db
, which corresponds to the CKAN database. Run docker exec -it CONTAINER_ID bash
and then psql -U ckan
to access the corresponding Postgres instance).
- Now you need to set up some information on Airflow. Access your local Airflow Connections panel at http://localhost:8080/admin/connection/. Create a new connection named
ckan_postgres
with your datastore information. For example, assuming yourCKAN_DATASTORE_WRITE_URL=postgresql://ckan:ckan@db/datastore
, use the following schema:
- We also need to set up two environment variables for Airflow. Access the Airflow Variable panel and set up
CKAN_SITE_URL
and yourCKAN_SYSADMIN_API_KEY
:
[TODO PARAMETERIZE VARS] [TODO PARAMETERIZE PATHS]
Then, create a dataset called aircan-example
using this script:
cd aircan
pip install -r requirements-example.txt
python examples/setup-ckan.py --api-key
Doing the load
We assume you now have a dataset named my-first-dataset
.
Create the DAG for loading
cp aircan/lib/api_ckan_load.py ~/airflow/dags/
Check if Airflow recognize your DAG with airflow list_dags
. You should see a DAG named ckan_load
.
Now you can test each task individually:
- To delete a datastore, run
airflow test ckan_load delete_datastore_table now
- To create a datastore, run
airflow test ckan_load create_datastore_table now
. You can see the correspondingresource_id
for the datastore on your logs. [TODO JSON is hardcoded now; parameterize on kwargs or some other operator structure] - To load a CSV to Postgres, run
airflow test ckan_load load_csv_to_postgres_via_copy now
. [TODO JSON is hardcoded now; insert resource_id on it. File path is also Hardcode, change it] - Finally, set your datastore to active:
airflow test ckan_load restore_indexes_and_set_datastore_active now
.
To run the entire DAG:
- Select the DAG [screenshot]
- Configure it with a path to ../your/aircan/examples/example1.csv
- Run it ... [screenshot]
Check the output
- Visit http://localhost:5000/dataset/aircan-example/ and see the resource named XXX. It will have data in its datastore now!
Example 2a: Remote file to DataStore
Same as example 2 but use this DAG instead:
cp aircan/examples/ckan-datastore-from-remote.py ~/airflow/dags/
Plus set a remote URL for loading.
Examples 3: Auto Load file uploaded to CKAN into CKAN DataStore
Configure CKAN to automatically load.
- Setup CKAN - see the previous sections.
- Also install this extension in your ckan instance:
ckanext-aircan-connector
. TODO: add instructions. - Configure ckan with location of your airflow instance and the DAG id (
aircan-load-csv
).
Run it
Run this script which uploads a CSV file to your ckan instance and will trigger a load to the datastore.
cd aircan
pip install -r requirements-example.txt
python examples/ckan-upload-csv.py
Using Google Cloud Composer
- Sign up for an account at https://cloud.google.com/composer. Create or select an existing project at Google Cloud Platform. For this example, we use one called
aircan-test-project
. - Create an environment at Google Cloud Composer, either by command line or by UI. Make sure you select Python 3 when creating the project. Here, we create an environment named
aircan-airflow
.
After creating your environment, it should appear in your environment list:
-
Override the configuration for
dag_run_conf_overrides_params
: -
Access the designated DAGs folder (which will be a bucket). Upload the contents of
local/path/to/aircan/aircan
to the bucket:
The contents of the subfolder aircan
must be:
-
Enter the subdirectory
dags
and delete the__init__.py
file on this folder. It conflicts with Google Cloud Composer configurations. -
Similarly to what we did on Example 2, access your Airflow instance (created by Google Cloud Composer) and add
CKAN_SITE_URL
andCKAN_SYSADMIN_API_KEY
as Variables. Now the DAGs must appear on the UI interface. -
Let's assume you have a resource on
https://demo.ckan.org/
withmy-res-id-123
as its resource_id. We also assume you have, in the root of your DAG bucket on Google Cloud platform, two files: One CSV file with the resource you want to upload, namedr3.csv
, with two columns,field1
andfield2
. The other file you must have in the root of your your bucket isr4.json
, an empty JSON file.
Since our DAGs expect parameters, you'll have to trigger them via CLI:
For example, to trigger api_ckan_load_single_node
, run (from your terminal):
gcloud composer environments run aircan-airflow \
--location us-east1 \
trigger_dag -- ckan_api_load_single_step \
--conf='{ "resource_id": "my-res-id-123", "schema_fields_array": [ "field1", "field2" ], "csv_input": "/home/airflow/gcs/dags/r3.csv", "json_output": "/home/airflow/gcs/dags/r4.json" }'
Check the logs (tip: filter them by your DAG ID, for example, ckan_api_load_single_step
). It should updload the data of your .csv
file to demo.ckan
successfully.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.