Skip to main content
Join the official 2019 Python Developers SurveyStart the survey!

pycondusco lets you run a function iteratively, passing it the rows of a dataframe or the results of a query.

Project description

PyCondusco

Overview

pycondusco lets you run a function iteratively, passing it the rows of a dataframe or the results of a query.

We call the functions pycondusco runs pipelines, and define a pipeline as a function that accepts a list of parameters and does something based on the values of the parameters.

The most common use case for pycondusco are data pipelines. For data pipelines that primarily run SQL queries, we can template queries with a library (ie. pystache), so that parametrized values are separated from the query logic. We can then render the query with the appropriate values:

import pycondusco
from pycondusco.run_pipeline import run_pipeline
import pystache

json_string = '{"first_name": "First", "last_name":"Last"}'

params = [
    {
        'k1':'v1',
        'k2':'v2',
    },
    {
        'k1':'v1',
        'k2': json_string,
    },
]

def pipeline(params):
    print pystache.render('k1 value is {{k1}}, k2 is {{k2}}',params)

run_pipeline(pipeline,params)

pycondusco provides the following extensions in functionality to the above design pattern:

  • the user can provide a query and each row of results is iteratively passed to the pipeline
  • any JSON-string parameter will be converted to an object before being passed to the pipeline

Functions

function description
run_pipeline(pipeline, parameters) iteratively pass each row of parameters to a pipeline, converting any JSON parameters to objects
run_pipeline_gbq(pipeline, query, project) calls run_pipeline with the results of query executed via bigquery

Installation

pip install pycondusco
export GOOGLE_APPLICATION_CREDENTIALS="<FILE_PATH_TO_CREDENTIALS>"

Features

  • Name-based substitution of query-results including JSON into pipelines, iterating through rows of parameters dataframe:
import pystache
from google.cloud import bigquery
import pycondusco
from pycondusco.run_pipeline_gbq import run_pipeline_gbq

client = bigquery.Client()

def pipeline(params):
    query = """
      SELECT
        {{#list}}
          SUM(CASE WHEN author.name ='{{name}}' THEN 1 ELSE 0 END) as n_{{name_clean}},
        {{/list}}
        repo_name
      FROM `bigquery-public-data.github_repos.sample_commits`
      GROUP BY repo_name
    """

    query_job = client.query(pystache.render(query, params))
    results = query_job.result()  # Waits for job to complete.
    for row in results:
        print(dict(row.items()))


query = """
   SELECT CONCAT('[',
   STRING_AGG(
     CONCAT('{\"name\":\"',name,'\",'
       ,'\"name_clean\":\"', REGEXP_REPLACE(name, r'[^[:alpha:]]', ''),'\"}'
     )
   ),
   ']') as list
   FROM (
     SELECT author.name,
       COUNT(commit) n_commits
     FROM `bigquery-public-data.github_repos.sample_commits`
     GROUP BY 1
     ORDER BY 2 DESC
     LIMIT 10
   )
"""

run_pipeline_gbq(pipeline, client, query)

Project details


Release history Release notifications

This version

0.1.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for pycondusco, version 0.1.0
Filename, size File type Python version Upload date Hashes
Filename, size pycondusco-0.1.0-py2-none-any.whl (19.7 kB) File type Wheel Python version py2 Upload date Hashes View hashes
Filename, size pycondusco-0.1.0.tar.gz (3.9 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page