Define your BigQuery tables as dataclasses. With this forked library you can also add Policy Tags to your dataclasses to protect your PII data.
Project description
bq-schema-policy-tags
This Project is a Fork of bq-schema by Limehome GmbH
Motivation (Original GH Repo)
At limehome we are heavy users of python and bigquery. This library was created to mainly solve the following issues:
- Define table schemas in code and have a migration script to apply changes.
- On deploy make sure that all schemas were applied, otherwise abort.
- Guarantee that when we try to write data to a table, the data matches the schema of the table (required / optional, datatypes)
- Version our tables and enable migrations to a new schema
Additionally this library aims to help the users through the usage of python typing.
- Specify your schema as a python dataclass
- Our migration script converts the data class into a bigquery schema definition
- Deserialize rows into a dataclass instance, while reading from a table
- Serialize a dataclass instance into a dictionary and write it to the table.
The main benefit of combining all these features is, that we can guarantee that our code will run, before we deploy to production.
Motivation for Fork
The primary reason for forking this project was to extend its existing capabilities to support Policy Tags and Taxonomy tags in Google Cloud Platform (GCP). The original codebase provided a strong foundation for working with Python dataclasses, but it lacked features for adding metadata tags that can be crucial for managing sensitive or personally identifiable information (PII).
Why Policy Tags?
Policy Tags allow for more granular control over data access. When dealing with sensitive or regulated data, such as PII, it's crucial to have mechanisms that enable fine-tuned permissions. Policy Tags in GCP make it possible to associate specific security and access policies with individual data fields, rather than entire data sets, thereby providing a more secure and manageable environment.
Taxonomy Tags in GCP
Taxonomy Tags in GCP serve as a method for categorizing data based on its type or nature. By attaching taxonomy tags to data fields, organizations can more easily track, manage, and govern their data. This is particularly useful for compliance with data protection laws and regulations, as well as for internal data governance strategies.
By incorporating these features into Python dataclasses through this fork, we aim to provide a more robust, secure, and compliant data handling solution.
Quickstart
Since this library makes use of newer features of python, you need at least python3.7.
- Install the package
pip install bq-schema-policy-tags
- Create a schema and a table definition in my_table.py
@dataclass
class Schema:
string_field: str = field(metadata={"description": "This is a STRING field."})
int_field: Optional[int]
some_floats: List[float]
bool_field: bool
class MyTable(BigqueryTable):
name = "my_table_name"
schema = Schema
If you have already tables created in your account, you can use the convert-table script to create a schema.
Note: The script produces a file which is meant to be a starting point. You will most likely have to add some imports yourself!
- Create your table
Hint: Make sure to have you credentials set:
export GOOGLE_APPLICATION_CREDENTIALS=your_auth.json
Alternativly you can set the service_file as a environment variable:
export GOOGLE_SERVICE_FILE={"type": "service_account", ...}
Now create your table
migrate-tables --project my_project --dataset my_dataset --module-path my_table --apply
- Write a row
from google.cloud import bigquery
row = Schema(string_field="foo", int_field=1, some_floats=[1.0, 2.0], bool_field=True)
row_transformer = RowTransformer[Schema](Schema)
serialized_row = RowTransformer.dataclass_instance_to_bq_row(row)
bigquery_client = bigquery.Client()
table = bigquery_client.get_table("project.dataset.my_table_name")
bigquery_client.insert_rows(table, [serialized_row])
- Validate you code with a type checker like mypy
mypy my_table.py
- Read a row
query = "SELECT * FROM project.dataset.my_table_name"
for row in bigquery_client.query(query=query):
deserialized_row = row_transformer.bq_row_to_dataclass_instance(row)
assert isinstance(deserialized_row, Schema)
Documentation
Schema definitions
For a full list of supported types check the following schema:
from typing import Optional
from dataclasses import dataclass
from bq_schema_policy_tags.types.type_mapping import Timestamp, Geography
@dataclass
class RequiredNestedField:
int_field: int = field(metadata={"description": "This field is an INT field."})
@dataclass
class RequiredSchema:
string_field: str = field(metadata={"description": "This field is a STRING field."})
string_field_optional = Optional[str]
bytes_field: bytes
int_field: int
float_field: float
numeric_field: Decimal
bool_field: bool
timestamp_field: Timestamp
date_field: date
time_field: time
datetime_field: datetime
geography_field: Geography
required_nested_field: RequiredNestedField = field(metadata={"description": "This field is a STRUCT field."})
optional_nested_field: Optional[RequiredNestedField]
repeated_nested_field: List[RequiredNestedField]
Adding Policy Tags (New)
To add policy tags to a field, include policy_tags in the field metadata.
from dataclasses import dataclass, field
@dataclass
class ExampleWithPolicyTags:
sensitive_data: str = field(metadata={"policy_tags": ["sensitive"]})
schema = dataclass_to_schema(ExampleWithPolicyTags)
Abstract tables
If you want to have an class that inherits from BigqueryTable but does not actually map to a table in BigQuery (is abstract, common interface etc.), you can have it inherit from ABC and it will not be discovered if you pass the flag "--ignore-abstract"
migrate-tables ... --ignore-abstract
class SomeInterface(BigqueryTable, ABC):
pass
class ConcreteImplementation(SomeInterface):
name="Some value here"
schema = SomeSchema
Without the flag, this would fail due to name and schema being required in "SomeInterface"
Timestamps
Timestamps are deserialized into datetime objects, due to the nature of the underlying bq library. To distinguish between datetime and timestamp use bq_schema_policy_tags.types.type_mapping. Usage:
from bq_schema_policy_tags.types.type_mapping import Timestamp
from datetime import datetime
the_timestamp = Timestamp(datetime.utcnow())
Geography
This library treats the geography data type as a string. BigQuery accepts geography values either in the WKT or GeoJson format. To actually parse and work with geodata in python, one could use the shapely library. Here is an example how to load a point from the WKT format:
from shapely.wkt import loads
loads('POINT (0 0)')
Table definitions
The bigquery class is used for:
- Recursive table discovery by our migrate-tables script
- Define table properties like name and schema
Required properties
- name: The name of the table
- schema: table schema either as dataclass or a list of schema fields
Optional properties
- project: name of the project, can be overwritten by the migrate-tables script
- dataset: name of the dataset, can be overwritten by the migrate-tables script
Versioning tables
Since bigquery does not allow backwards incompatible schema changes, you might want to version your schemas.project
class MyTable(BigqueryTable):
name = "my_table_name"
schema = Schema
version = "1"
By default the version will be appended to the table name, like so: my_table_name_v1. If you want to overwrite this behaviour, you can implement the full_table_name method.
Time partitioning
Define time partitioning for your table:
from bq_schema_policy_tags.types.type_mapping import Timestamp
from google.cloud.bigquery import TimePartitioning, TimePartitioningType
class MyTable:
time_partitioning = TimePartitioning(
type_=TimePartitioningType.DAY, field="some_column"
)
Scripts
migrate-tables
This script has two uses:
- Check if locally defined schemas are in sync with the schemas in bigquery
- If a difference is detected, we try to apply the changes
The script will find all defined tables recursivly for a given python module.
Note: If you have not defined your project and / or dataset in code, you will have to pass it as a parameter to the script. Show the help:
migrate-tables --help
Check if tables are in sync. List all changes.
migrate-tables --module-path module/
If you want the script to fail on a change, add the validate flag. Useful for running inside your CI:
migrate-tables --module-path module/ --validate
Apply changes
migrate-tables --module-path src/jobs/ --apply
convert-table
If you already have tables created in bigquery, this script print the corresponding dataclass for you.
Note: The script produces a file which is meant to be a starting point. You will most likely have to add some imports yourself!
Show the help:
convert-table --help
Print a table:
convert-table --project project --dataset scraper --table-name table_name >> schema.py
Development
Setting up your dev environment
-
Clone the project.
-
Navigate into the cloned project.
-
Create a virtual environment with python version >=3.7
pipenv --python PYTHON_VERSION
$ pipenv --python 3.7
or
virtualenv -p /PATH_TO_PYTHON/ /DESIRED_PATH/VENV_NAME
$ virtualenv -p /usr/bin/python3.7 placeholder
-
Install flit via pip
$ pip install flit
-
Install packages
$ flit install --symlink
Code quality
Run all code quality checks:
inv check-all
Test
inv test
Lint
inv lint
Types
inv type-check
Code format
inv format-code
Validate code is correctly formatted:
inv check-code-format
Known Issues
MissingValueError in test_row_transformer.py
When running the pytest suite, a MissingValueError
may occur, specifically indicating a missing value for the field "string_field". This issue appears to be inherent to the original codebase and may not have a straightforward solution due to the project being no longer maintained.
Potential Causes
- The
row
object in the test case may not contain the "string_field" and its value. - The
Schema
data class might be expecting a value for "string_field" which is not provided. - The function
dict_to_row(row_as_dict: dict) -> Row
may not be converting the dictionary to aRow
object correctly.
Suggested Workarounds
- Double-check the test data for completeness.
- If possible, modify the
Schema
data class to provide a default value for "string_field". - Check the
RowTransformer
class and thefrom_dict
function for issues related to field handling.
Please note that these are suggested workarounds and may not resolve the issue entirely.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.