Skip to main content

Tools for acquiring and analyzing Pulse API data.

Project description

Pulse Throw for Python

Tools for acquiring and analyzing Pulse API data.

Pulse is a wearable sensor for baseball players to monitor throwing workload.

Installation

The pulse module can be installed via pip:

pip install pulse-throw

Getting Started

In order to use the Pulse client, you must have a client_id, client_secret, and refresh_token provided by the Pulse team at Driveline.

It is best practice to use these values stored in a .env file:

# Pulse credentials
CLIENT_ID="<CLIENT_ID>"
CLIENT_SECRET="<CLIENT_SECRET>"
REFRESH_TOKEN="<REFRESH_TOKEN>"

You can use python-dotenv to load the enviroment variables for use in code:

import os
from dotenv import load_dotenv

load_dotenv()

client_id = os.getenv("CLIENT_ID") or ""
client_secret = os.getenv("CLIENT_SECRET") or ""
refresh_token = os.getenv("REFRESH_TOKEN") or ""

Once the environment variables are loaded, a PulseClient object can created:

import pulse_throw as pt

# Using a traditional constructor and destructor
client = pt.PulseClient(client_id, client_secret, refresh_token)
...
del client

# Using a context manager that destructs automatically
with pt.PulseClient(client_id, client_secret, refresh_token) as client:
    ...

The Pulse client will authenticate the client upon construction by default. This involves fetching an access token from the API. If you don't want this request to happen automatically, pass authenticate=False into the object constructor. In order to make other requests, you will need to manually call the authenticate() method so that the other requests have the proper authorization headers:

client = pt.PulseClient(
    client_id, client_secret, refresh_token, authenticate=False
)

client.authenticate()

...

del client

API Requests

There are four different API requests that PulseClient can make.

Get Profile

Returns info about the owner of the session.

Method: get_profile()

Payload: None

Example Response:

{
  "id": "<id>",
  "firstName": "<first-name>",
  "lastName": "<last-name>",
  "email": "<email>"
}

Get Team

Returns info about the owner of the session's team.

Method: get_team()

Payload: None

Example Response:

{
    "team": {
        "name": "TEAMNAME",
        "id": "JQtyNOYdDH"
    },
    "members": [
        {
            "userId": "r5FiwuBlYZ",
            "teamMemberId": "yvUsIxsjrg",
            "athleteProfileId": "dgTHp0nlN0",
            "firstName": "<player-first-name>",
            "lastName": "<player-last-name>",
            "email": "<player-email>"
        },
        ...

Get Snapshots

Gets daily snapshots generated for one or more users over a range of dates. The owner of the session must have permission to access their data (e.g. the requested users must be either the current user or a member of a team for which the current user is a coach).

Method: get_snapshots(start_date: str = <end_date - 8 days>, end_date: str = <today's date>, user_ids: str | list[str] = <user_id>)

Payload:

  • startDate: The earliest date for which to get data, pulled from the start_date parameter. Expected in ISO 8601 format (YYYY-MM-DD). Defaults to eight days before the end_date parameter.
  • endDate: The latest date for which to get data, pulled from the end_date parameter. Expected in ISO 8601 format (YYYY-MM-DD). Defaults to today's date.
  • pulseUserIds: User IDs for whom to get data, pulled from the user_ids parameter. IDs must belong to the owner of the session or a member of their team. Defaults to the ID of the owner of the session.

Example Response:

{
    "<id>": [
        {
            "date": "2021-08-01",
            "throwCount": 0,
            "highEffortThrowCount": 0,
            "acuteWorkload": 2041.948878326791,
            "chronicWorkload": 3229.3557437324716,
            "normAcuteWorkload": 3.1660202416772414,
            "normChronicWorkload": 5.007082087486716,
            "workloadRatio": 0.6323084356035417,
            "dailyWorkload": 0,
            "normDailyWorkload": 0,
            "baseballProjectedOneDayWorkloads": [
                0,
                30,
                26.82887058971028,
                ...
            ]
        },
        ...

Get Events

Gets all of the individual throw events for one or more users over a range of dates. The owner of the session must have permission to access their data (e.g. the requested users must be either the current user or a member of a team for which the current user is a coach).

Method: get_events(start_date: str = <end_date - 8 days>, end_date: str = <today's date>, user_ids: str | list[str] = <user_id>)

Payload:

  • startDate: The earliest date for which to get data, pulled from the start_date parameter. Expected in ISO 8601 format (YYYY-MM-DD). Defaults to eight days before the end_date parameter.
  • endDate: The latest date for which to get data, pulled from the end_date parameter. Expected in ISO 8601 format (YYYY-MM-DD). Defaults to today's date.
  • pulseUserIds: User IDs for whom to get data, pulled from the user_ids parameter. IDs must belong to the owner of the session or a member of their team. Defaults to the ID of the owner of the session.

Example Response:

{
    "<id>": [
        {
            "eventId": "POC6TE5b8V",
            "scaler": null,
            "datetime": "2021-03-01T15:49:55.000Z",
            "tag": null,
            "armSlot": 59.782794823856534,
            "armSpeed": 452.4706718068326,
            "shoulderRotation": 155.58127276383868,
            "torque": 34.744537353515625,
            "ballVelocity": null,
            "highEffort": false,
            "ballWeight (oz)": 5.11472,
            "preferredBallWeightUnit": "OZ",
            "simulated": null,
            "workload": 100.728515625,
            "normalizedWorkload": 0.10925094783306122
        },
        ...

Data Filtering Functions

The pulse module contains functions that can filter event data based on certain characteristics.

Filter By Tag

Filter throw events by tag or tags.

Function: filter_by_tag(events: list[dict[str, Any]], tags: str | list[str], blacklist: bool = False)

# Get all throw events from a single date
events = client.get_events(
    start_date="2022-05-01", end_date="2022-05-01"
)

# Get all throw events marked with the desired tag
pre_game = pt.filter_by_tag(
    events[client.user_id], tags="Pre-Game"
)

# Get all throw events marked with one of multiple tags
non_game = pt.filter_by_tag(
    events[client.user_id], tags=["Pre-Game", "Plyo", "Warmup"]
)

# Get all throw events without the desired tag
non_plyos = pt.filter_by_tag(
    events[client.user_id], tags="Plyo", blacklist=True
)

Filter Simulated

Filter throw events based on simulated status.

Function: filter_simulated(events: list[dict[str, Any]], get_simulated: bool = False)

# Get all throw events from a single date
events = client.get_events(
    start_date="2022-05-01", end_date="2022-05-01"
)

# Get all throw events that are not simulated
non_simulated = pt.filter_simulated(events[client.user_id])

# Get all throw events that are simulated
simulated = pt.filter_simulated(
    events[client.user_id], get_simulated=True
)

Filter High Effort

Filter throw events based on high effort status.

Function: filter_high_effort(events: list[dict[str, Any]], get_high_effort: bool = True)

# Get all throw events from a single date
events = client.get_events(
    start_date="2022-05-01", end_date="2022-05-01"
)

# Get all throw events that are high effort
high_effort = pt.filter_high_effort(events[client.user_id])

# Get all throw events that are not high effort
low_effort = pt.filter_high_effort(
    events[client.user_id], get_high_effort=False
)

Workload Functions

The pulse module contains functions that are useful when manually performing workload calculations.

Since throw events and daily snapshots are stored in a dict indexed by user ID, the desired user ID must be accessed by key when passing data into the following functions. For indiviudal users (i.e. not coaches), the desired user ID will be the same as the owner of the session and can be accessed using client.user_id. For coaches, the get_team() endpoint can match players with their respective user IDs.

Sum Workload

Compute sum of workload or normalizedWorkload values from individual throw events returned by the get_events() endpoint.

Function: sum_workload(events: list[dict[str, Any]], normalized: bool = True)

# Get all throw events from a single date
events = client.get_events(
    start_date="2022-05-01", end_date="2022-05-01"
)

# Make sure to access the desired user from events
norm_workload = pt.sum_workload(events[client.user_id])

# Compute sum using unnormalized values
workload = pt.sum_workload(
    events[client.user_id], normalized=False
)

This function can be useful for summing the workload of throws with a certain tag:

pre_game = pt.filter_by_tag(events, tag="Pre-Game")

pre_game_workload = pt.sum_workload(pre_game)

Compute Acute Workload

Compute acute workload using dailyWorkload or normDailyWorkload values from daily snapshots returned by the get_snapshots() endpoint.

Function: compute_acute_workload(snapshots: list[dict[str, Any]], end_date: str = <most recent date in snapshots>, normalized: bool = True)

# Get daily snapshots for a two week time period.
snapshots = client.get_snapshots(
    start_date="2022-05-01", end_date="2022-05-14"
)

# Make sure to access the desired user from snapshots
norm_acute_workload = pt.compute_acute_workload(
    snapshots[client.user_id]
)

# Compute acute workload using unnormalized values
acute_workload = pt.compute_acute_workload(
    snapshots[client.user_id], normalized=False
)

Acute workload is the weighted average of one-day workloads over a 9-day period. The weights for the nine days are defined by pt.ACUTE_WEIGHTS:

ACUTE_WEIGHTS = [1.3, 1.225, 1.15, 1.075, 1.0, 0.925, 0.85, 0.775, 0.7]

where the current day is multiplied by 1.3 and the last day is multipled by 0.7.

The divisor for acute workload is usually 9, but it can be less if there have been less than 7 days of throwing (3 after 1 day of throwing, 4 after 2 days, ..., 9 after 7+ days). This function assumes that the dates in snapshots are the only days of throwing and will adjust the acute divisor accordingly.

Compute Chronic Workload

Compute chronic workload using dailyWorkload or normDailyWorkload values from daily snapshots returned by the get_snapshots() endpoint.

Function: compute_chronic_workload(snapshots: list[dict[str, Any]], end_date: str = <most recent date in snapshots>, normalized: bool = True)

# Get daily snapshots for a one month time period.
snapshots = client.get_snapshots(
    start_date="2022-05-01", end_date="2022-06-01"
)

# Make sure to access the desired user from snapshots
norm_chronic_workload = pt.compute_chronic_workload(
    snapshots[client.user_id]
)

# Compute acute workload using unnormalized values
chronic_workload = pt.compute_chronic_workload(
    snapshots[client.user_id], normalized=False
)

Chronic workload is the average of one-day workloads over a 28-day period.

The divisor for chronic workload is usually 28, but it can be less if there have been less than 24 days of throwing (5 after 1 day of throwing, 6 after 2 days, ..., 28 after 24+ days). This function assumes that the dates in snapshots are the only days of throwing and will adjust the chronic divisor accordingly.

Compute Acute/Chronic Workload Ratio

Compute acute/chronic workload ratio using dailyWorkload or normDailyWorkload values from daily snapshots returned by the get_snapshots() endpoint.

Function: compute_acr(snapshots: list[dict[str, Any]], end_date: str = <most recent date in snapshots>, normalized: bool = True)

# Get daily snapshots for a one month time period.
snapshots = client.get_snapshots(
    start_date="2022-05-01", end_date="2022-06-01"
)

# Make sure to access the desired user from snapshots
norm_acr = pt.compute_acr(snapshots[client.user_id])

# Compute ACR using unnormalized values
acr = pt.compute_acr(snapshots[client.user_id], normalized=False)

Acute/chronic workload ratio is the quotient of acute workload and chronic workload over a 28-day period.

Additional Resources

You can learn more about Pulse and throwing workload at the following links:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pulse-throw-0.1.0.tar.gz (13.7 kB view details)

Uploaded Source

Built Distribution

pulse_throw-0.1.0-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file pulse-throw-0.1.0.tar.gz.

File metadata

  • Download URL: pulse-throw-0.1.0.tar.gz
  • Upload date:
  • Size: 13.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.5.0

File hashes

Hashes for pulse-throw-0.1.0.tar.gz
Algorithm Hash digest
SHA256 a1f24bc09e3004de9d95a2238caefc3f29ec7ec9ccde4c3b03dddb3a9b5a914c
MD5 c3c7ef28bb7520433672e91af6baa3d1
BLAKE2b-256 0a549dd3a5085b052bab7f7e8b4de20038794f469dd7a9f5f21bf207adb2dff5

See more details on using hashes here.

File details

Details for the file pulse_throw-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: pulse_throw-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.5.0

File hashes

Hashes for pulse_throw-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d88f294a22f1ba854a238db4c16df65565cee5fa97d1d1db91b376b45d54cfd1
MD5 01d69c52cf05b2a561992c0f348bad66
BLAKE2b-256 7c6dfb4df0cf70e3566701954220c0edc92b4fe71f3a8420a13e2038b8fefb6f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page