ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • spark write to parquet 를 빠르게 해보자.
    카테고리 없음 2020. 4. 22. 16:48

    지금 진행중인 프로젝트에서는 parquet 를 사용해서 데이터를 저장한다. 

     

    근데... 초기 parquet로 저장하는 시간이 너무 오래걸린다.... 

     

    더더욱 문제는 300G짜리 데이터를 쓰는데...메모리에 한꺼번에 올려서 처리하는지... 

     

    outofmemory 스러운 문제가 발생된다. 

     

    그래서 writing 하는 여러가지 방법을 테스트 해볼까 한다. 

     

    테스트에 사용되는 파일은 1억건짜리(1G) 데이터를 활용한다. 

     

    기존 write 방식 

     

    로직 수행시간 20~25분가량...

                orgDf.write.parquet(                
                    path=주소,  
                    mode="overwrite"
                )

     

     

    첫번째 

     

    partition을 사용하는 방법 데이터는 partition을 위핸 컬럼을 생성하여 준비 한다.

    1억건을 1000개씩 자르는 간단한 코드를 준비한다. 

     

    tmp_part_seq 에 가상의 idx 를 partition용 index 를 생성하였다.

    ex ) 

                    tmp_udf = udf(lambda x: x // 10000 + 1,IntegerType())
                    orgDf = orgDf.withColumn("tmp_part_seq", tmp_udf(orgDf[orgOrcIndex]))

     

    write 를 시작해보자 

                orgDf.write.partitionBy("tmp_part_seq").parquet(                
                    path=디렉토리,
                    mode="overwrite"
                )

     

     

    GG... 1시간넘도록 안끝났다. 

     

    GC는 일어나는거 같긴한데.. 뭐하느라 heap을 200g까지 잡아먹는건지 모르겠다;

     

     

    결국 포기 

     

    다음 버전으로 도전한다. 

     


    두번째 코드 

     

    저장되는 파일의 갯수를 coalesce 를 통해서 정할 수 있다고 하는데.. 

     

    coalesce 는 partition과 다르게 shuffle = true 옵션을 주지 않는한 셔플수행이 없다고 한다. 

     

    (부모 RDD의 기본 파티션을 유지 하는 방식이기 때문...)

     

    partition은 셔플수행이 기본이라고한다..

     

    이를 적용해서 써보도록 한다.

    orgDf.coalesce(1).write.parquet(             
                      path=주소,
                      mode="overwrite"
                 )

     

    걸린시간 18분

    DEBUG 2020-04-22 14:42:02,415 : ### writing parquet start --시작
    DEBUG 2020-04-22 15:00:45,788 : result : -- 끝

     

    메모리 사용은... 일전에 파티셔닝 write 보다 안정적으로 적게 사용. 

     

     

    아마 파티셔닝에 필요한 연산이 없기때문일거 같다.

     

    하지만 write 중 GC를 하는것 같지 않다.

     

    파티셔닝 write와 비교 그림 


     

    세번째 

     

    coalesce 에서 드라마틱한 결과물을 못봤으니.. repartition을 시도해보았다. 

     

    repatition 1개 의 코드

     

                orgDf.repartition(1).write.parquet(             
                     path=주소,
                     mode="overwrite"
                )

     

    동일한 18분...?

     

    DEBUG 2020-04-22 15:39:16,182 : ### writing parquet start 
    DEBUG 2020-04-22 15:58:12,129 : [1/make_org] result 

    메모리 사용 외에는 동일해 보이는 시간....

     

     

     

    repartition(10).... 결과

     

    DEBUG 2020-04-22 16:03:59,949 : ### writing parquet start
    DEBUG 2020-04-22 16:21:53,799 : [1/make_org] result 

     

    18분... 동일

     

     

     

    이게 문제가 아닌거 같다 ..

     

    다른 접근법이 필요해보인다 ㅠㅠ

     

     

     


    문제는 윈도우 함수였음. 

     

    orgDf = orgDf.withColumn(orgOrcIndex, row_number().over(Window.orderBy('v_idx'))).drop('v_idx')

     

    row_number() 에서 sort하는 플랜이 있어서 해당 로직을

     

    orgDf = orgDf.withColumn(orgOrcIndex, monotonically_increasing_id()) 으로 변경하니까..

     

    안정적으로 잘됨...

     

    문제는 ordering.... 멍청한 삽질이었다...

     

     

    추가적으로 spark.storage.memoryFraction 옵션을 통해 메모리에 저장되는 데이터 사이즈를 정해주는것도 도움이 된거같다. 

     

     

Designed by Tistory.