본문 바로가기

IT/Python

pySpark sql 활용하여 계층데이터 처리하기 CTE

from pyspark.sql.functions import broadcast

df = sqlContext.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer")

dfcheck = df.drop('NewID')
dfdistinctID = df.select('NewID').distinct()

dfidfinal = dfdistinctID.join(dfcheck, [dfcheck.OldID == dfdistinctID.NewID], how="left_anti") #We find the IDs that have not been replaced

# 마지막에 해당하는 정보 찾기 
dfcurrent = df.join(dfidfinal, [dfidfinal.NewID == df.NewID], how="left_semi") #We find the the rows that are related to the IDs that have not been replaced, then assign them to the dfcurrent dataframe.

dfresult = dfcurrent 
dfdifferentalias = df.select(df.OldID.alias('id1'), df.NewID.alias('id2'))

# 결과값이 안나올때까지 데이터 탐색 
while dfcurrent.count() > 0:
  dfcurrent = dfcurrent.join(broadcast(dfdifferentalias), [dfcurrent.OldID == dfdifferentalias.id2], how="inner").select(dfdifferentalias.id1.alias('OldID'), dfcurrent.NewID.alias('NewID'))
  dfresult = dfresult.unionAll(dfcurrent)

dfresult.orderBy('OldID').show()

 

반응형