Skip to main content

python for apache-doris

Project description

DorisClient

python for apache-doris

Install

pip install DorisClient

Use

Create Test Table

CREATE TABLE `streamload_test` (
  `id` int(11) NULL COMMENT "",
  `shop_code` varchar(64) NULL COMMENT "",
  `sale_amount` decimal(18, 2) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "test"
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);

-- If you want to enable sequence streamload, make sure Doris table enable sequence load first
-- ALTER TABLE streamload_test ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type" = "bigint");

streamload

from DorisClient import DorisSession, DorisLogger, Logger

# DorisLogger.setLevel('ERROR')  # default:INFO

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
doris = DorisSession(**doris_cfg)

# append
data = [
    {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99'},
    {'id': '2', 'shop_code': 'sdd2', 'sale_amount': '5'},
    {'id': '3', 'shop_code': 'sdd3', 'sale_amount': '3'},
]
doris.streamload('streamload_test', data)

# delete
data = [
    {'id': '1'},
]
doris.streamload('streamload_test', data, merge_type='DELETE')

# merge
data = [
    {'id': '10', 'shop_code': 'sdd1', 'sale_amount': '99', 'delete_flag': 0},
    {'id': '2', 'shop_code': 'sdd2', 'sale_amount': '5', 'delete_flag': 1},
    {'id': '3', 'shop_code': 'sdd3', 'sale_amount': '3', 'delete_flag': 1},
]
doris.streamload('streamload_test', data, merge_type='MERGE', delete='delete_flag=1')

# Sequence append
data = [
    {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99', 'source_sequence': 11, },
    {'id': '1', 'shop_code': 'sdd2', 'sale_amount': '5', 'source_sequence': 2},
    {'id': '2', 'shop_code': 'sdd3', 'sale_amount': '3', 'source_sequence': 1},
]
doris.streamload('streamload_test', data, sequence_col='source_sequence')

# Sequence merge
data = [
    {'id': '1', 'shop_code': 'sdd1', 'sale_amount': '99', 'source_sequence': 100, 'delete_flag': 0},
    {'id': '1', 'shop_code': 'sdd2', 'sale_amount': '5', 'source_sequence': 120, 'delete_flag': 0},
    {'id': '2', 'shop_code': 'sdd3', 'sale_amount': '3', 'source_sequence': 100, 'delete_flag': 1},
]
doris.streamload('streamload_test', data, sequence_col='source_sequence', merge_type='MERGE',
                 delete='delete_flag=1')


# streamload default retry config:  max_retry=3, retry_diff_seconds=3
# if you don't want to retry, "_streamload" can help you
doris._streamload('streamload_test', data)

# if you want to changed retry config, follow code will work 
from DorisClient import DorisSession, Retry

max_retry = 5
retry_diff_seconds = 10


class MyDoris(DorisSession):

    @Retry(max_retry=max_retry, retry_diff_seconds=retry_diff_seconds)
    def streamload(self, table, dict_array, **kwargs):
        return self._streamload(table, dict_array, **kwargs)


doris = MyDoris(**doris_cfg)
doris.streamload('streamload_test', data)

execute doris-sql

from DorisClient import DorisSession

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
doris = DorisSession(**doris_cfg)

sql = 'select * from streamload_test limit 1'

# fetch all the rows by sql, return dict array
rows = doris.read(sql)
print(rows)

# fetch all the rows by sql, return tuple array
rows = doris.read(sql, cursors=None)
print(rows)

# execute sql commit
doris.execute('truncate table streamload_test')

collect meta

from DorisClient import DorisMeta

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
dm = DorisMeta(**doris_cfg)

# auto create table for collect doris meta
# 1. meta_table for saving all table meta
# 2. meta_tablet for saving all tablet meta
# 3. meta_partition for saving all partition meta
# 4. meta_size for saving all table size meta
# 5. meta_table_count for saving all table row count
# 6. meta_materialized_view for saving all materialized view
# 6. meta_backup for saving all backup view
dm.create_tables()

# collect table meta >> meta_table
dm.collect_table()

# collect partition meta >> meta_partition
dm.collect_partition()

# collect tablet meta >> meta_tablet 
# deploy collect_partition
dm.collect_tablet()

# collect table size meta >> meta_size
dm.collect_size()

# collect table row count >> meta_table_count
dm.collect_table_count()

# collect materialized view meta >> meta_materialized_view
dm.collect_materialized_view(only_insert=True)

# collect backup meta >> meta_backup
dm.collect_backup()

modify buckets

from DorisClient import DorisAdmin

# # debug
# import logging
# logger = logging.getLogger()
# logger.setLevel(logging.DEBUG) 

doris_cfg = {
    'fe_servers': ['10.211.7.131:8030', '10.211.7.132:8030', '10.211.7.133:8030'],
    'database': 'testdb',
    'user': 'test',
    'passwd': '123456',
}
da = DorisAdmin(**doris_cfg)

# modify the number and method of buckets for the specified table
da.modify(database_name='testdb', table_name='streamload_test', distribution_key='id,shop_code', buckets=1)

# modify the number and method of buckets for partition
da.modify(database_name='testdb', table_name='partition_tb', partition_name='p20231214', buckets=2)

# only rebuild table and remove unsupport properties
da.modify(database_name='testdb', table_name='streamload_test', only_rebuild=True, ignore_properties='in_memory')

# only rebuild table and add properties
da.modify(database_name='testdb', table_name='streamload_test', only_rebuild=True, add_properties='"enable_unique_key_merge_on_write" = "true"')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

DorisClient-1.2.22.tar.gz (15.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

DorisClient-1.2.22-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file DorisClient-1.2.22.tar.gz.

File metadata

  • Download URL: DorisClient-1.2.22.tar.gz
  • Upload date:
  • Size: 15.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.11

File hashes

Hashes for DorisClient-1.2.22.tar.gz
Algorithm Hash digest
SHA256 c156fc460ef5cb6cfbd1503c1bb81d7d1df2ac498f2382defab12d161b6f9f72
MD5 075b98e16e38550ed562741d6aafe390
BLAKE2b-256 a38a78526732686ca0cc375a888c5ce584416f341e62e7308799bb2b3ea24499

See more details on using hashes here.

File details

Details for the file DorisClient-1.2.22-py3-none-any.whl.

File metadata

  • Download URL: DorisClient-1.2.22-py3-none-any.whl
  • Upload date:
  • Size: 19.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.11

File hashes

Hashes for DorisClient-1.2.22-py3-none-any.whl
Algorithm Hash digest
SHA256 9cbc03a7220d42899e8ad7bc373239fdd564c494170b4d07bcdb913ab107791b
MD5 187c1d5a2e6bde96fc7f664f1cc4b2a9
BLAKE2b-256 8b57876a490d8b5538d52ad752fbcbd9545e7feec426455381127f10b6ac1cb1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page