from pyspark.sql.functions import broadcast
df = sqlContext.createDataFrame([(1, 2), (2, 3), (3, 4), (4, 5), (6, 7), (7, 8),(9, 10)], "OldID integer,NewID integer")
dfcheck = df.drop('NewID')
dfdistinctID = df.select('NewID').distinct()
dfidfinal = dfdistinctID.join(dfcheck, [dfcheck.OldID == dfdistinctID.NewID], how="left_anti") #We find the IDs that have not been replaced
# 마지막에 해당하는 정보 찾기
dfcurrent = df.join(dfidfinal, [dfidfinal.NewID == df.NewID], how="left_semi") #We find the the rows that are related to the IDs that have not been replaced, then assign them to the dfcurrent dataframe.
dfresult = dfcurrent
dfdifferentalias = df.select(df.OldID.alias('id1'), df.NewID.alias('id2'))
# 결과값이 안나올때까지 데이터 탐색
while dfcurrent.count() > 0:
dfcurrent = dfcurrent.join(broadcast(dfdifferentalias), [dfcurrent.OldID == dfdifferentalias.id2], how="inner").select(dfdifferentalias.id1.alias('OldID'), dfcurrent.NewID.alias('NewID'))
dfresult = dfresult.unionAll(dfcurrent)
dfresult.orderBy('OldID').show()
반응형
'IT > Python' 카테고리의 다른 글
[Python] List, Dictionary, Tuple Sum 계산 (0) | 2022.05.30 |
---|---|
[Python] 두개의 리스트 하나로 합치기 (0) | 2022.05.30 |
How to drop NaN Using Group by On Python Pandas Dataframe? (0) | 2020.10.09 |
How to flatten a list of lists in python ? (0) | 2020.10.09 |
How to split a string at specific length in python? (0) | 2020.10.08 |