Spark

Apache Spark 성능 튜닝

appendonly 2024. 2. 5. 04:14

Overview

Apache Spark job의 성능을 개선하는 방법들은 많다.

올바른 데이터 format: Spark는 다양한 format의 데이터를 읽고 쓸 수 있으나, Parquet, ORC처럼 일부 포맷들은 더 효율적이다.
파티셔닝 전략: Spark 클러스터 내 여러 노드에 데이터를 파티션 할 수 있다. 노드 간 데이터 이동을 최소화 할 파티셔닝 기법을 사용한다.
최적의 알고리즘: Spark는 데이터 처리 task를 위한 다양한 알고리즘을 지원한다. 이 중 제일 효율적인 알고리즘을 활용한다.
Configuration 변경: Configuration option에 따라 성능이 달라진다. Data processing task를 위한 옵션도 존재하며, 그 외 일반적인 옵션이 존재한다.
Tool 활용: 오픈소스와 상업용 tool들로 Spark job을 분석/최적화 할 수 있다.

Parquet format이 text format보다 빠른 이유는 https://amazelimi.tistory.com/entry/Parquet%EC%97%90-%EB%8C%80%ED%95%B4-%EC%95%8C%EC%95%84%EB%B3%B4%EC%9E%90

 

 

// Read the data in text format
val textData = sc.textFile("data.txt")

// Convert the data to Parquet format
val parquetData = textData.toDF().write.parquet("data.parquet")

// Read the data back in Parquet format
val parquetData2 = spark.read.parquet("data.parquet")

// Compare the performance of the two reads
parquetData2.count()


Word를 기준으로 데이터를 파티셔닝하여 처리할 데이터량을 줄일 수 있다.

// Read the data in text format
val textData = sc.textFile("data.txt")

// Partition the data by word
val partitionedData = textData.map(_.split(" ")).partitionBy(100)

// Count the number of words in each partition
partitionedData.map(_.size).reduce(_ + _)


더 나은 알고리즘을 활용한 성능 개선 예시.

 

// Read the data in text format
val textData = sc.textFile("data.txt")

// Count the number of words in the data using the traditional algorithm
val traditionalCount = textData.flatMap(_.split(" ")).count()

// Count the number of words in the data using the optimized algorithm
val optimizedCount = textData.countByValue()

// Compare the performance of the two algorithms
traditionalCount
optimizedCount

 

Executor 수와 메모리 설정(Configuration)을 통한 성능 개선.

// Set the number of executors to 10
spark.conf.set("spark.executor.instances", 10)

// Set the amount of memory per executor to 1GB
spark.conf.set("spark.executor.memory", "1g")

// Run a job
val job = spark.textFile("data.txt").count()

// Print the time it took to run the job
job.time()

 


예제 상 Spark Performance Analyzer는 job의 성능 상 병목 지점을 식별하는데 도움되는 Tool이다.

// Install the Spark Performance Analyzer tool
spark-shell --packages org.apache.spark:spark-sql-performance-analyzer_2.11:2.4.0

// Analyze a Spark job
spark-analyzer <path-to-spark-job>

// Print the results of the analysis
spark-analyzer --print-results <path-to-spark-job>



Data Format

Spark가 read/write 가능한 data format들은 다음과 같다.

* Text, CSV, JSON
* ORC, Parquet
* Hive/HBase/Cassandra Tables

각 format마다 장단점이 있다.

 

Partitioning Strategy

 

Partitioning Scheme은 다음과 같다.

Round-robin partitioning: 가장 기본적인 파티셔닝 스키마로, 단순히 모든 파티션에 data를 고르게 나눈다.
Hash Partitioning: 해시 함수로 data를 파티셔닝한다. 균일한 데이터에 적합하다.
Range Partitioning: 범위로 data를 파티셔닝한다. 비균일한 데이터에 적합하다.

 

# Create a DataFrame
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ("id", "name"))

# Partition the DataFrame by round-robin
df.rdd.partitionBy(3)

# Partition the DataFrame by hash
df.rdd.partitionBy(hash("id"))

# Partition the DataFrame by range
df.rdd.partitionBy(range(3))



Data processing task을 위해 Spark가 제공하는 알고리즘은 다음과 같다.

Aggregation: sum, average, count 등
Join: inner/outer/left join 등
Filter: where, having 등
Sort: sortBy, orderBy 등
GroupBy: groupByKey, groupBy 등

# Create a DataFrame
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ("id", "name"))

# Sum the values in the "id" column
df.rdd.map(lambda x: x[0]).sum()
# Create two DataFrames
df1 = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ("id", "name"))
df2 = spark.createDataFrame([(1, "X"), (2, "Y"), (3, "Z")], ("id", "name"))

# Join the two DataFrames on the "id" column
df1.join(df2, on="id").show()
# Create a DataFrame
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ("id", "name"))

# Filter the DataFrame to only include rows where the "id" is greater than 1
df.rdd.filter(lambda x: x[0] > 1).show()
# Create a DataFrame
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ("id", "name"))

# Sort the DataFrame by the "id" column
df.rdd.sortBy(lambda x: x[0]).show()



Configuration Tuning

Memory Configuration
Spark driver와 executor에 할당된 메모리 크기를 조절하여 충분한 메모리 활용도와 out-of-memory를 방지한다.

Configuration 수정으로 유휴 리소스를 줄여 더 빠르게 작업한다. 코드 상에서 지정해도 되고, spark-defaults.conf 파일로도 가능하다.
# set the spark.driver.memory configuration
spark.conf.set("spark.driver.memory", "4g")

# set the spark.executor.memory configuration 
spark.conf.set("spark.executor.memory", "2g")
spark.driver.memory 4g
spark.executor.memory 8g
또 다른 방법은, spark-submit 시 runtime memory를 설정할 수 있다.
spark-submit --conf spark.driver.memory=4g --conf spark.executor.memory=8g app.py

Core Configuration
Executor에 할당되는 core 수를 조정할 수 있다. Core 수는 병렬성과 리소스 활용도에 직결된다. spark-submit 시 매개변수로 지정하거나, 코드 상 SparkConf로 가능하다.

spark-submit으로 런타임에서 core 수를 지정하는 방법은 다음과 같다. Executor마다 core 4개를 할당하는 명령어이다.
spark-submit --conf spark.executor.cores=4 app.py

코드 상에서 SparkConf로 지정하는 법은 다음과 같다.
from pyspark import SparkConf, SparkContext

# Create a SparkConf instance
conf = SparkConf().setAppName("MyApp").setMaster("local[4]")  # local mode with 4 cores

# Create a Spark context with the configuration
sc = SparkContext(conf=conf)

Shuffling Process Optimization
Spark에서 shuffling process는 서로 다른 파티션 간 data를 재분배하는 단계를 뜻한다. groupBy, join 등의 연산을 하기 전에 이루어진다. Shuffling 설정은 코드나 configuration 파일로 변경 가능하다.

Shuffling 관련 핵심 설정들을 정리한다.

spark.shuffle.manager: Shuffling manager를 지정한다. 예로, sort나 tungsten-sort 같은 manager들은 각기 다른 shuffling 및 sorting 전략을 제공한다.

spark.shuffle.manager   tungsten-sort


spark.shuffle.compress: Shuffling process에서 데이터를 압축하여 네트워크로 이동되는 데이터 크기를 줄일 수 있다.

spark.shuffle.compress   true


spark.reducer.maxSizeInFlight: Shuffle 연산 시 Mapper의 실행 결과를 Reducer가 읽어갈 때 사용할 버퍼의 크기로 기본값은 48MB.

spark.reducer.maxSizeInFlight   128m


spark.shuffle.sort.bypassMergeThreshold: Shuffling 단계 중, merge 할 임계치를 지정한다. 데이터 크기가 임계치보다 작다면, merge를 하지 않는다.

(추가 중)

Spark Tools

Spark job 최적화해줄 Tool들은 대표적으로 다음과 같다.

Spark Performance Analyzer: Spark job 상 bottleneck 식별에 도움
Spark History Server: Spark job들의 성능을 추적하는데 도움
Spark Profiler: Spark job을 프로파일링하여 bottleneck 식별에 도움
SparkSQLExplainer: Spark SQL 쿼리의 실행 계획 분석에 도움