Skip to main content

pycondusco lets you run a function iteratively, passing it the rows of a dataframe or the results of a query.

Project description

PyCondusco

Overview

pycondusco lets you run a function iteratively, passing it the rows of a dataframe or the results of a query.

We call the functions pycondusco runs pipelines, and define a pipeline as a function that accepts a list of parameters and does something based on the values of the parameters.

The most common use case for pycondusco are data pipelines. For data pipelines that primarily run SQL queries, we can template queries with a library (ie. pystache), so that parametrized values are separated from the query logic. We can then render the query with the appropriate values:

import pycondusco
from pycondusco.run_pipeline import run_pipeline
import pystache

json_string = '{"first_name": "First", "last_name":"Last"}'

params = [
    {
        'k1':'v1',
        'k2':'v2',
    },
    {
        'k1':'v1',
        'k2': json_string,
    },
]

def pipeline(params):
    print pystache.render('k1 value is {{k1}}, k2 is {{k2}}',params)

run_pipeline(pipeline,params)

pycondusco provides the following extensions in functionality to the above design pattern:

  • the user can provide a query and each row of results is iteratively passed to the pipeline
  • any JSON-string parameter will be converted to an object before being passed to the pipeline

Functions

function description
run_pipeline(pipeline, parameters) iteratively pass each row of parameters to a pipeline, converting any JSON parameters to objects
run_pipeline_gbq(pipeline, query, project) calls run_pipeline with the results of query executed via bigquery

Installation

pip install pycondusco
export GOOGLE_APPLICATION_CREDENTIALS="<FILE_PATH_TO_CREDENTIALS>"

Features

  • Name-based substitution of query-results including JSON into pipelines, iterating through rows of parameters dataframe:
import pystache
from google.cloud import bigquery
import pycondusco
from pycondusco.run_pipeline_gbq import run_pipeline_gbq

client = bigquery.Client()

def pipeline(params):
    query = """
      SELECT
        {{#list}}
          SUM(CASE WHEN author.name ='{{name}}' THEN 1 ELSE 0 END) as n_{{name_clean}},
        {{/list}}
        repo_name
      FROM `bigquery-public-data.github_repos.sample_commits`
      GROUP BY repo_name
    """

    query_job = client.query(pystache.render(query, params))
    results = query_job.result()  # Waits for job to complete.
    for row in results:
        print(dict(row.items()))


query = """
   SELECT CONCAT('[',
   STRING_AGG(
     CONCAT('{\"name\":\"',name,'\",'
       ,'\"name_clean\":\"', REGEXP_REPLACE(name, r'[^[:alpha:]]', ''),'\"}'
     )
   ),
   ']') as list
   FROM (
     SELECT author.name,
       COUNT(commit) n_commits
     FROM `bigquery-public-data.github_repos.sample_commits`
     GROUP BY 1
     ORDER BY 2 DESC
     LIMIT 10
   )
"""

run_pipeline_gbq(pipeline, client, query)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pycondusco-0.1.0.tar.gz (3.9 kB view hashes)

Uploaded Source

Built Distribution

pycondusco-0.1.0-py2-none-any.whl (19.7 kB view hashes)

Uploaded Python 2

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page