Notice
Recent Posts
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
Tags more
Archives
관리 메뉴

🐥

[Spark]Parquet type not supported인 parquet file을 읽는 방법 - StructType을 사용해서 Custom Schema로 로드) 본문

데이터/Spark

[Spark]Parquet type not supported인 parquet file을 읽는 방법 - StructType을 사용해서 Custom Schema로 로드)

•8• 2020. 10. 21. 22:44

github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L139

위 코드에서 보면 `typenotsupported`인 type들이 몇몇 있다.

파케이 스키마가 unsupported type을 포함하고 있으면 Dataframe으로 읽어올 수가 없다.

 

org.apache.spark.sql.AnalysisException: Parquet type not supported

Caused by: org.apache.spark.sql.AnalysisException: Parquet type not supported: INT32 (UINT_8);
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.typeNotSupported$1(ParquetSchemaConverter.scala:101)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:137)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:89)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$1.apply(ParquetSchemaConverter.scala:68)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$1.apply(ParquetSchemaConverter.scala:65)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetToSparkSchemaConverter$$convert(ParquetSchemaConverter.scala:65)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:62)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:664)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:664)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:664)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:621)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:603)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

 

 

그럴 경우 parquet schema를 확인해서 직접 커스텀 스키마를 만들고 해당 스키마로 파일을 열어주면 된다.

parquet schema대로 해야하는 이유는 타입이 달라지면 Dataframe에서 NULL로 읽히기 때문이다.

 

newSchema = StructType([ StructField("ID", LongType(), True),
                         StructField("point", IntegerType(), True),
                         StructField("check", IntegerType(), True) ])
df = spark\
      .schema(newSchema)\
      .parquet(path)

 

 

 

https://stackoverflow.com/questions/64383029/cannot-load-parquet-file-parquet-type-not-supported-int32-uint-8-with-pysp