Skip to main content

Calculate optimized properties of Spark configuration

Project description

scopt

Unit test Build Release draft

Spark Config Optimizer calculate optimal cpu cores and memory values for Spark executor and driver.

Installing

Install scopt from PyPI via pip.

pip install scopt

Usage

Basic

from scopt import SparkConfOptimizer
from scopt.instances import Instance

executor_instance = Instance(32, 250)
num_nodes = 10
deploy_mode = 'client'

sco = SparkConfOptimizer(executor_instance, num_nodes, deploy_mode)
print(sco)

# spark.driver.cores: 5
# spark.driver.memory: 36g
# spark.driver.memoryOverhead: 5g
# spark.executor.cores: 5
# spark.executor.memory: 36g
# spark.executor.memoryOverhead: 5g
# spark.executor.instances: 60
# spark.default.parallelism: 600
# spark.sql.shuffle.partitions: 600

Cluster mode is also supported.

deploy_mode = 'cluster'

sco = SparkConfOptimizer(executor_instance, num_nodes, deploy_mode)
print(sco)

# spark.driver.cores: 5
# spark.driver.memory: 36g
# spark.driver.memoryOverhead: 5g
# spark.executor.cores: 5
# spark.executor.memory: 36g
# spark.executor.memoryOverhead: 5g
# spark.executor.instances: 59
# spark.default.parallelism: 590
# spark.sql.shuffle.partitions: 590

Different instance type for driver node is also supported. Specifying driver instance is enabled only client mode.

executor_instance = Instance(32, 250)
driver_instance = Instance(4, 30)
deploy_mode = 'client'

sco = SparkConfOptimizer(
    executor_instance,
    num_nodes,
    deploy_mode,
    driver_instance,
)
print(sco)

# spark.driver.cores: 3
# spark.driver.memory: 26g
# spark.driver.memoryOverhead: 3g
# spark.executor.cores: 5
# spark.executor.memory: 36g
# spark.executor.memoryOverhead: 5g
# spark.executor.instances: 60
# spark.default.parallelism: 600
# spark.sql.shuffle.partitions: 600

Dynamic Allocation

For Spark dynamic allocation mode, you can calculate with dynamic_allocation is set True (default False).

Not specify num_nodes

When dynamic_allocation is True and num_nodes is None, optimizer returns only Spark properties about resources (Not contains about parallelism like spark.default.parallelism).

sco = SparkConfOptimizer(
    executor_instance,
    deploy_model=deploy_mode,
    dynamic_allocation=True,
)
print(sco)

# spark.driver.cores: 3
# spark.driver.memory: 26g
# spark.driver.memoryOverhead: 3g
# spark.executor.cores: 5
# spark.executor.memory: 36g
# spark.executor.memoryOverhead: 5g

Specify num_nodes

If dynamic_allocation set True (default False) and specify num_nodes, optimizer returns spark.default.parallelism and spark.sql.shuffle.partitions for when executor nodes reach to num_nodes, but does not return spark.executor.instances.

sco = SparkConfOptimizer(
    executor_instance,
    num_nodes,
    deploy_model=deploy_mode,
    dynamic_allocation=True,
)
print(sco)

# spark.driver.cores: 3
# spark.driver.memory: 26g
# spark.driver.memoryOverhead: 3g
# spark.executor.cores: 5
# spark.executor.memory: 36g
# spark.executor.memoryOverhead: 5g
# spark.default.parallelism: 600
# spark.sql.shuffle.partitions: 600

Predefined Instance

You can use predefined Instance class. Currently supports AWS EC2 instance type.

from scopt.instances.aws import AwsInstanceMap

mapping = AwsInstanceMap()

mapping['r5.4xlarge']
# Instance(num_cores=16, memory_size=120)
mapping['p3.8xlarge']
# Instance(num_cores=4, memory_size=236)

Set properties to SparkConf

You can set properties to SparkConf directory via as_list method.

from pyspark import SparkConf
from scopt import SparkConfOptimizer
from scopt.instances import Instance

executor_instance = Instance(32, 250)
num_nodes = 10
deploy_mode = 'client'

sco = SparkConfOptimizer(executor_instance, num_nodes, deploy_mode)

conf = SparkConf()
print(conf.getAll())
# Property has not be set yet.
# dict_items([])

conf.setAll(sco.as_list())
# dict_items([
#     ('spark.driver.cores', '5'),
#     ('spark.driver.memory', '36g'),
#     ('spark.driver.memoryOverhead', '5g'),
#     ('spark.executor.cores', '5'),
#     ('spark.executor.memory', '36g'),
#     ('spark.executor.memoryOverhead', '5g'),
#     ('spark.executor.instances', '60'),
#     ('spark.default.parallelism', '600'),
#     ('spark.sql.shuffle.partitions', '600')
# ])

Reference

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scopt-0.0.5.tar.gz (9.9 kB view details)

Uploaded Source

Built Distribution

scopt-0.0.5-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file scopt-0.0.5.tar.gz.

File metadata

  • Download URL: scopt-0.0.5.tar.gz
  • Upload date:
  • Size: 9.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for scopt-0.0.5.tar.gz
Algorithm Hash digest
SHA256 594a63ee1d87bb87b8871d89684ab66f551b1cf4a0da00ce94443255834732d4
MD5 fe09dfc445756886054274cfd9d5f890
BLAKE2b-256 93462959268f8bf959273e828112ab2c94fc56ec52fd2f15457a985fe506bd73

See more details on using hashes here.

File details

Details for the file scopt-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: scopt-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 8.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for scopt-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 89cdedc57acff495bf8ee83c834f1c8891dfd99681c4b4a7a1222db073b7e984
MD5 6b1ed5a6e6f3e3a4132a02ff91022f06
BLAKE2b-256 2d7604bba06ffc152c65a58567d5b8ee154d5cb2ffeed5a7dc0e06fe94069842

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page