Python

[Jupyter] Pyspark로 SQL데이터 연동하기

2022. 7. 29. 11:05

1. Spark 초기화

import findspark
findspark.init()

[package] findspark

더보기

 findspark 패키지의 findspark.init() 함으로써

pyspark 라이브러리를 보통 라이브러리처럼 import 할 수 있게 해주는 패키지

  • Pyspark는 sys.path에 default로 등록되어 있지 않아서, 보통 라이브러리처럼 import하기 위해 findspark 패키지를 통해 runtime 동안에만 sys.path에 등록할 수 있게 해준다.
    (pyspark shell로 개발하거나, hadoop cluster의 yarn을 이용해 pyspark job을 제출할 때에는 필요하지 않다)

2. Spark 세션 정의 

 스파크 세션 열고, 생성된 세션을 가지고 데이터 베이스를 읽어온다.

jdbc 라는 드라이버로 mysql 을 열어준다.

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("spark-sql")\
    .config("spark.driver.extraClassPath", "./Connector J 8.0/mysql-connector-java-8.0.27.jar")\
    .getOrCreate()
    
df_users = spark.read\ 
    .format("jdbc")\                                  
    .option("url", "jdbc:mysql://localhost/데이터베이스이름")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("dbtable", "테이블명").option("user", "root")\
    .option("password", "root").load()

print(df_users.columns)

df_user는 '데이터프레임' 형식으로 가져온다.

 

 

df_users.show()

 

템플릿 지정

df_users.registerTempTable("테이블명")
df_users

registerTemTable(테이블명)을 넣어주면 테이블 구조가 부착된다.

 

테이블 확인

spark.sql("""
SELECT *
FROM 테이블명
""").show()

spark.sql("""     """).show() 안에 쿼리를 넣어주면 된다.