Hadoop EcoSystem

[PySpark] org.apache.spark.SparkException: Python worker failed to connect back.

비번변경 2023. 12. 1. 11:59

현상

PySpark가 동작하는지 확인하고자 간단히 DataFrame을 생성하는 코드를 실행했다.

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # SparkSession 초기화
    spark = SparkSession.builder.appName("Simple Application").getOrCreate()

    # 데이터프레임 생성
    data = [("James", "", "Smith", "1991-04-01", "M", 3000),
            ("Michael", "Rose", "", "2000-05-19", "M", 4000),
            ("Robert", "", "Williams", "1978-09-05", "M", 4000),
            ("Maria", "Anne", "Jones", "1967-12-01", "F", 4000),
            ("Jen", "Mary", "Brown", "1980-02-17", "F", -1)]
    columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
    df = spark.createDataFrame(data=data, schema=columns)
    df.show()
    
    # 종료
    spark.stop()

하지만 아래와 같은 에러와 함께 동작하지 않았다.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/13 10:30:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Python23/11/13 10:30:35 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

해결 방법을 적어둔다.

 

 

원인

구글링을 해보니, PySpark와 Python 간의 연결이 설정되어 있지 않아서 발생하는 문제인 것 같다.

환경 변수를 설정하는 등의 방법으로 해결할 수 있었다.

 

아무래도 설치하는 과정을 일부 건너뛴 모양이다.

 

 

해결 1 - 환경변수 추가

환경변수 PYSPARK_PYTHON=python를 설정한다.

값을 추가하고 실행하니 정상적으로 동작하는 모습을 확인할 수 있었다.

 

 

해결 2 - findspark.init 추가

findspark는 site-package에 pyspark를 심볼릭 링크로 연결하거나 런타임에 sys.path에 추가해 주는 라이브러리로 별도 설치가 필요하다.

import findspark
findspark.init('/path/to/spark_home')

SparkContext나 SparkSesion을 정의하기 전에 호출하며, init 함수 호출 시 전달받는 매개변수가 없으면 SPARK_HOME 환경 변수를 사용한다. 

 

findspark.init을 포함한 코드로 실행하면

from pyspark.sql import SparkSession
import findspark

if __name__ == '__main__':
    # SparkSession 초기화
    findspark.init()
    spark = SparkSession.builder.appName("Simple Application").getOrCreate()

    # # 데이터프레임 생성
    data = [("James", "", "Smith", "1991-04-01", "M", 3000),
            ("Michael", "Rose", "", "2000-05-19", "M", 4000),
            ("Robert", "", "Williams", "1978-09-05", "M", 4000),
            ("Maria", "Anne", "Jones", "1967-12-01", "F", 4000),
            ("Jen", "Mary", "Brown", "1980-02-17", "F", -1)]
    columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
    df = spark.createDataFrame(data=data, schema=columns)
    df.show()
    
    # 종료
    spark.stop()

잘 동작하는 모습을 확인할 수 있다.

 

 

참고 문서

Python worker failed to connect back in Pyspark or spark Version 2.3.1

Python worker failed to connect back

https://github.com/minrk/findspark