현상
PySpark가 동작하는지 확인하고자 간단히 DataFrame을 생성하는 코드를 실행했다.
from pyspark.sql import SparkSession
if __name__ == '__main__':
# SparkSession 초기화
spark = SparkSession.builder.appName("Simple Application").getOrCreate()
# 데이터프레임 생성
data = [("James", "", "Smith", "1991-04-01", "M", 3000),
("Michael", "Rose", "", "2000-05-19", "M", 4000),
("Robert", "", "Williams", "1978-09-05", "M", 4000),
("Maria", "Anne", "Jones", "1967-12-01", "F", 4000),
("Jen", "Mary", "Brown", "1980-02-17", "F", -1)]
columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.show()
# 종료
spark.stop()
하지만 아래와 같은 에러와 함께 동작하지 않았다.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/13 10:30:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Python23/11/13 10:30:35 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Python worker failed to connect back.
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
해결 방법을 적어둔다.
원인
구글링을 해보니, PySpark와 Python 간의 연결이 설정되어 있지 않아서 발생하는 문제인 것 같다.
환경 변수를 설정하는 등의 방법으로 해결할 수 있었다.
아무래도 설치하는 과정을 일부 건너뛴 모양이다.
해결 1 - 환경변수 추가
환경변수 PYSPARK_PYTHON=python를 설정한다.
값을 추가하고 실행하니 정상적으로 동작하는 모습을 확인할 수 있었다.
해결 2 - findspark.init 추가
findspark는 site-package에 pyspark를 심볼릭 링크로 연결하거나 런타임에 sys.path에 추가해 주는 라이브러리로 별도 설치가 필요하다.
import findspark
findspark.init('/path/to/spark_home')
SparkContext나 SparkSesion을 정의하기 전에 호출하며, init 함수 호출 시 전달받는 매개변수가 없으면 SPARK_HOME 환경 변수를 사용한다.
findspark.init을 포함한 코드로 실행하면
from pyspark.sql import SparkSession
import findspark
if __name__ == '__main__':
# SparkSession 초기화
findspark.init()
spark = SparkSession.builder.appName("Simple Application").getOrCreate()
# # 데이터프레임 생성
data = [("James", "", "Smith", "1991-04-01", "M", 3000),
("Michael", "Rose", "", "2000-05-19", "M", 4000),
("Robert", "", "Williams", "1978-09-05", "M", 4000),
("Maria", "Anne", "Jones", "1967-12-01", "F", 4000),
("Jen", "Mary", "Brown", "1980-02-17", "F", -1)]
columns = ["firstname", "middlename", "lastname", "dob", "gender", "salary"]
df = spark.createDataFrame(data=data, schema=columns)
df.show()
# 종료
spark.stop()
잘 동작하는 모습을 확인할 수 있다.
참고 문서
Python worker failed to connect back in Pyspark or spark Version 2.3.1