Skip to main content

python for apache-doris

Project description

DorisClient

python for apache-doris

Install

pip install DorisClient

Use

Create Test Table

CREATE TABLE `streamload_test` (
  `id` int(11) NULL COMMENT "",
  `shop_code` varchar(64) NULL COMMENT "",
  `sale_amount` decimal(18, 2) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "test"
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);

-- If you want to enable sequence streamload, make sure Doris table enable sequence load first
-- ALTER TABLE streamload_test ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "bigint");

streamload

from DorisClient import DorisSession, DorisLogger, Logger

# DorisLogger.setLevel('ERROR')  # default:INFO

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
doris = DorisSession(**doris_cfg)

# append
data = [
    {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99'},
    {'id': '2', 'shop_code': 'sdd2', 'sale_amount': '5'},
    {'id': '3', 'shop_code': 'sdd3', 'sale_amount': '3'},
]
doris.streamload('streamload_test', data)

# delete
data = [
    {'id': '1'},
]
doris.streamload('streamload_test', data, merge_type='DELETE')

# merge
data = [
    {'id': '10', 'shop_code': 'sdd1', 'sale_amount': '99', 'delete_flag': 0},
    {'id': '2', 'shop_code': 'sdd2', 'sale_amount': '5', 'delete_flag': 1},
    {'id': '3', 'shop_code': 'sdd3', 'sale_amount': '3', 'delete_flag': 1},
]
doris.streamload('streamload_test', data, merge_type='MERGE', delete='delete_flag=1')

# Sequence append
data = [
    {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99', 'source_sequence': 11, },
    {'id': '1', 'shop_code': 'sdd2', 'sale_amount': '5', 'source_sequence': 2},
    {'id': '2', 'shop_code': 'sdd3', 'sale_amount': '3', 'source_sequence': 1},
]
doris.streamload('streamload_test', data, sequence_col='source_sequence')

# Sequence merge
data = [
    {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99', 'source_sequence': 100, 'delete_flag': 0},
    {'id': '1', 'shop_code': 'sdd2', 'sale_amount': '5', 'source_sequence': 120, 'delete_flag': 0},
    {'id': '2', 'shop_code': 'sdd3', 'sale_amount': '3', 'source_sequence': 100, 'delete_flag': 1},
]
doris.streamload('streamload_test', data, sequence_col='source_sequence', merge_type='MERGE',
                 delete='delete_flag=1')


# streamload default retry config:  max_retry=3, retry_diff_seconds=3
# if you don't want to retry, "_streamload" can help you
doris._streamload('streamload_test', data)

# if you want to changed retry config, follow code will work 
from DorisClient import DorisSession, Retry

max_retry = 5
retry_diff_seconds = 10


class MyDoris(DorisSession):

    @Retry(max_retry=max_retry, retry_diff_seconds=retry_diff_seconds)
    def streamload(self, table, dict_array, **kwargs):
        return self._streamload(table, dict_array, **kwargs)


doris = MyDoris(**doris_cfg)
doris.streamload('streamload_test', data)

execute doris-sql

from DorisClient import DorisSession

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
doris = DorisSession(**doris_cfg)

sql = 'select * from streamload_test limit 1'

# fetch all the rows by sql, return dict array
rows = doris.read(sql)
print(rows)

# fetch all the rows by sql, return tuple array
rows = doris.read(sql, cursors=None)
print(rows)

# execute sql commit
doris.execute('truncate table streamload_test')

collect meta

from DorisClient import DorisMeta

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
dm = DorisMeta(**doris_cfg)

# auto create table for collect doris meta
# 1. meta_table for saving all table meta
# 2. meta_tablet for saving all tablet meta
# 3. meta_partition for saving all partition meta
# 4. meta_size for saving all table size meta
dm.create_tables()

# collect table meta >> meta_table
dm.collect_table()

# collect partition meta >> meta_partition
dm.collect_partition()

# collect tablet meta >> meta_tablet 
# deploy collect_partition
dm.collect_tablet()

# collect table size meta >> meta_size
dm.collect_size()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

DorisClient-1.2.1.tar.gz (11.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

DorisClient-1.2.1-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file DorisClient-1.2.1.tar.gz.

File metadata

  • Download URL: DorisClient-1.2.1.tar.gz
  • Upload date:
  • Size: 11.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.11

File hashes

Hashes for DorisClient-1.2.1.tar.gz
Algorithm Hash digest
SHA256 3bc39ba8421b722ebc9dd93b9018003daefd49528d56ac16c31f7542f0083cc7
MD5 31b71918650e3148953313eefa0d51b4
BLAKE2b-256 d4358f334d5aa674db40805a09f7aefba79c946afd6fc212356a2511aaeefa7c

See more details on using hashes here.

File details

Details for the file DorisClient-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: DorisClient-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 13.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.11

File hashes

Hashes for DorisClient-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2e60c20f40eade403373d3a51ddb78bc0b41491e224c0e64334173c5dc638a43
MD5 517d31c10a4cc22b12965bc895fabc74
BLAKE2b-256 c4a3ab69bf9ccd6b054cebdf82a35e116d01f2cc961c414af99d1a21c0472aa4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page