Skip to main content

read microsoft sql server table data and upload to bigquery

Project description

# migbq

rdbms-to-bigquery-data-loader

## Requirement

* Python
- CPython 2.7.x

* RDBMS (below, DB)
- Microsoft SQL Server
- Mysql (development)

* Table Spec
- All table must have Numeric Primary Key Field

* DB User Grant
- SELECT, INSERT, UPDATE, CREATE
- can access DB's metadata ([INFORMATION_SCHEMA] database)
- some metadata tables create in source RDBMS
- (If you don't want create table in source, you can use sqlite. fork this project and edit source)

* Google Cloud SDK
- install Google Cloud SDK must be required
- https://cloud.google.com/sdk/downloads
- https://cloud.google.com/sdk/gcloud/reference/auth/login

* Pymssql freetds
- http://www.pymssql.org/en/stable/

## Install

```
export PYMSSQL_BUILD_WITH_BUNDLED_FREETDS=1
pip install migbq
```

## Usage

### write Configuration File

* like embulk ( http://www.embulk.org )

### Example

#### general congif file
* config.yml

```yml
in:
type: mssql
host: localhost
user: USER
password: PASSWORD
port: 1433
database: DATABASE
tables:
- tbl
- tbl2
- tbl3
batch_size: 50000
temp_csv_path: /temp/pymig_csv
temp_csv_path_complete: /temp/pymig_csv_complete
out:
type: bigquery
project: GCP_PROJECT
dataset: BQ_DATASET
```

#### jinja2 template

* config.j2.yml
- variable is enviromant variable only.
- file extension is **.j2.yml**

```yml
in:
type: mssql
{% include "mssql-connect.yml" %}
tables:
- tbl
- tbl2
- tbl3
batch_size: 50000
temp_csv_path: /temp/pymig_csv
temp_csv_path_complete: /temp/pymig_csv_complete
out:
type: bigquery
project: {{ env.GCP_PROJECT }}
dataset: BQ_DATASET
```


### Run

#### (1) Execute

```bash
migbq run config.yml
```

#### (2) Check Job Complete

```bash
migbq check config.yml
```


#### (3) Check table count equals

```bash
migbq sync config.yml
```

* Primary Key base count check.

### Run Forever

* you can add crontab
* migbq have exclusive process lock. so you can add crontab every minute.
* you must add both **run** and **check**


## Description

### run command

**[1]** select RDBMS table metadata
- get table primary key name in RDBMS metadata table.
- get column name and type fields in RDBMS metadata table.

**[2]** select RDBMS Primary key value range
- get min / max PK of table

**[3]** select data in primary key range
- select with pk min and min + batch_size

```sql
select * from tbl where 0 < idx and idx <= 100;
```

- create file **pymig-tbl-idx-1-100**
- gzip csv

**[4]** upload csv file to bigquery
- direct upload to bigquery table. not upload to GCS (quota exceed can occur)

**[5]** Repeat 1~4 until over the max primary key.

For example, batch_size : 100, max pk is 321, then rdbms query execute like below.

```sql

select * from tbl where 0 < idx and idx <= 100;
select * from tbl where 100 < idx and idx <= 200;
select * from tbl where 200 < idx and idx <= 300;
select * from tbl where 300 < idx and idx <= 400;

-- end

```

### check command

* check bigquery jobid end.
* retry fail job.


### Log file of program

* log file create in config file's sub directory [log]

### Pid file of program

* pid file provide unique process for unique command. created at below directory. exclusive file lock.


```
/tmp
```

### load metadata table

#### META: migrationmetadata

* one row insert when each 'select' runs

| field name | type | description | smaple value | etc |
| ----: |--------|----------------------------------------|-----------------|-------------|
| tableName | STRING | target [tableName] | tbl | Primary Key |
| firstPk | INTEGER | [tableName]'s Min Primary Key value | 1 | |
| lastPk | INTEGER | [tableName]'s Max Primary Key value | 123 | |
| currentPk | STRING | [tableName]'s read complete Primary Key value | 20 | |
| regDate | DATETIME| this row's insert date | 2017-11-29 01:02:03 | |
| modDate | DATETIME| firstPk, lastPk modify date | 2017-11-29 01:02:03 | |
| endDate | DATETIME| currentPk reach lastPk date | 2017-11-29 11:22:33 | |
| pkName | STRING | [tableNames]'s Primary Key Name | idx | |
| rowCnt | INTEGER | [tableNames]'s count(*) | 123 | |
| pageTokenCurrent | STRING | not use now | tbl | |
| pageTokenNext | STRING | not use now | tbl | |

#### LOG: migrationmetadatalog

* sequance
- run : insert a row to this table when 'select [tableName]' executed
- run : update a row to this table when bigquery jobId created
- check : update a row to this table's jobComplete and checkComplete when bigquery jobId call ends

| field name | type | description | smaple value | etc |
| ----: |--------|----------------------------------------|-----------------|-------------|
| idx | BigInt | PK | 1 | Primary Key Auto Increment |
| tableName | STRING | [tableName] | tbl | Primary Key |
| regDate | DATETIME | row insert date | 2017-11-29 01:02:03 | |
| endDate | DATETIME | when jobId is 'DONE' | 2017-11-29 11:22:33 | |
| pkName | STRING | [tableNames]'s Primary Key Name | idx | |
| cnt | INTEGER | bigquery api : statistics.load.outputRows | 123 | |
| pkUpper | INTEGER | each 'select' executed : [PKName] <= [pkUpper] | 100 | |
| pkLower | INTEGER | each 'select' executed : [PKName] > [pkLower] | 0 | |
| pkCurrent | INTEGER | same as pkUpper | 99 | |
| jobId | STRING | bigquery upload job jobId | job-adf132f31rf3f | |
| errorMessage | STRING | when jodId check result is 'ERROR', then write this | ERROR:bigquery quota exceed | |
| checkComplete | INTEGER | check command | 1 | |
| jobComplete | INTEGER | check command jobId check complete. success=1, fail=-1 | 1 | |
| pageToken | STRING | use as etc | | |


## loadmap

* parallel loading not supported.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

migbq-0.0.85.tar.gz (43.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

migbq-0.0.85-py2.py3-none-any.whl (56.2 kB view details)

Uploaded Python 2Python 3

File details

Details for the file migbq-0.0.85.tar.gz.

File metadata

  • Download URL: migbq-0.0.85.tar.gz
  • Upload date:
  • Size: 43.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for migbq-0.0.85.tar.gz
Algorithm Hash digest
SHA256 53aecfd55e9875be5920c07185e92a506cab738daec33f9cb133d4705d547a3b
MD5 5a40a6a90302303dadc8b25b74de48f6
BLAKE2b-256 6717843d78730cc91a64ec9c6bb997176d929fff87e07fcadb2ba45c6a5e5c19

See more details on using hashes here.

File details

Details for the file migbq-0.0.85-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for migbq-0.0.85-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 da42802f167a741a4649afb05d7b1d4f09d08d9dc9b9b2265820d8576aafbb80
MD5 c2c757071758bd15b3df193ca7b60d9e
BLAKE2b-256 eae81b2c3db5eec310c0982049a8a3701534455a7e204f021d9a4ee000c2b901

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page