From 4a18c6454c512ea37dcc40d4603360e22b93572c Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 10 May 2021 15:39:10 +0800 Subject: [PATCH 1/7] avoid using deprecated and --- .../org/apache/spark/sql/v2/avro/AvroTable.scala | 6 ++++-- .../{AvroWriteBuilder.scala => AvroWrite.scala} | 7 +++---- .../{FileWriteBuilder.scala => FileWrite.scala} | 15 ++++++++------- .../execution/datasources/v2/csv/CSVTable.scala | 6 ++++-- .../csv/{CSVWriteBuilder.scala => CSVWrite.scala} | 7 +++---- .../execution/datasources/v2/json/JsonTable.scala | 6 ++++-- .../{JsonWriteBuilder.scala => JsonWrite.scala} | 7 +++---- .../execution/datasources/v2/orc/OrcTable.scala | 6 ++++-- .../orc/{OrcWriteBuilder.scala => OrcWrite.scala} | 7 +++---- .../datasources/v2/parquet/ParquetTable.scala | 6 ++++-- ...rquetWriteBuilder.scala => ParquetWrite.scala} | 7 +++---- .../execution/datasources/v2/text/TextTable.scala | 6 ++++-- .../{TextWriteBuilder.scala => TextWrite.scala} | 7 +++---- 13 files changed, 50 insertions(+), 43 deletions(-) rename external/avro/src/main/scala/org/apache/spark/sql/v2/avro/{AvroWriteBuilder.scala => AvroWrite.scala} (88%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{FileWriteBuilder.scala => FileWrite.scala} (95%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/{CSVWriteBuilder.scala => CSVWrite.scala} (92%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/{JsonWriteBuilder.scala => JsonWrite.scala} (92%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/{OrcWriteBuilder.scala => OrcWrite.scala} (93%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/{ParquetWriteBuilder.scala => ParquetWrite.scala} (95%) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/{TextWriteBuilder.scala => TextWrite.scala} (92%) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala index 2096f1a08a0d1..f19d856252b50 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.AvroUtils -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DataType, StructType} @@ -43,7 +43,9 @@ case class AvroTable( AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - new AvroWriteBuilder(paths, formatName, supportsDataType, info) + new WriteBuilder { + override def build(): Write = AvroWrite(paths, formatName, supportsDataType, info) + } override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala similarity index 88% rename from external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala rename to external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala index c4defb9f065e3..3a91fd0c73d1a 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWrite.scala @@ -21,16 +21,15 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.avro.AvroUtils import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.OutputWriterFactory -import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class AvroWriteBuilder( +case class AvroWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) - extends FileWriteBuilder(paths, formatName, supportsDataType, info) { + info: LogicalWriteInfo) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index cd62ee7814bf2..427d7f644e4b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, Write} import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf @@ -37,16 +37,17 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.SerializableConfiguration -abstract class FileWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) extends WriteBuilder { +trait FileWrite extends Write { + def paths: Seq[String] + def formatName: String + def supportsDataType: DataType => Boolean + def info: LogicalWriteInfo + private val schema = info.schema() private val queryId = info.queryId() private val options = info.options() - override def buildForBatch(): BatchWrite = { + override def toBatch: BatchWrite = { val sparkSession = SparkSession.active validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis) val path = new Path(paths.head) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala index 3cafe37b743f3..839cd01be75a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.csv.CSVOptions -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -50,7 +50,9 @@ case class CSVTable( } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - new CSVWriteBuilder(paths, formatName, supportsDataType, info) + new WriteBuilder { + override def build(): Write = CSVWrite(paths, formatName, supportsDataType, info) + } override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala similarity index 92% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala index bfbb1831aa63d..3a1848f544c45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWrite.scala @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter -import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} -class CSVWriteBuilder( +case class CSVWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) - extends FileWriteBuilder(paths, formatName, supportsDataType, info) { + info: LogicalWriteInfo) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala index 4b66aec6acbed..52168007aaa18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.json.JSONOptionsInRead -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.json.JsonDataSource import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -50,7 +50,9 @@ case class JsonTable( } override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - new JsonWriteBuilder(paths, formatName, supportsDataType, info) + new WriteBuilder { + override def build(): Write = JsonWrite(paths, formatName, supportsDataType, info) + } override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala similarity index 92% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala index 19f472057ea7d..ea1f6793cb9ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonWrite.scala @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter -import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class JsonWriteBuilder( +case class JsonWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) - extends FileWriteBuilder(paths, formatName, supportsDataType, info) { + info: LogicalWriteInfo) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index 3ef41210de181..9cc4525badd81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -44,7 +44,9 @@ case class OrcTable( OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - new OrcWriteBuilder(paths, formatName, supportsDataType, info) + new WriteBuilder { + override def build(): Write = OrcWrite(paths, formatName, supportsDataType, info) + } override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala similarity index 93% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala index 48044748708d9..286e871080535 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWrite.scala @@ -24,16 +24,15 @@ import org.apache.orc.mapred.OrcStruct import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils} -import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class OrcWriteBuilder( +case class OrcWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) - extends FileWriteBuilder(paths, formatName, supportsDataType, info) { + info: LogicalWriteInfo) extends FileWrite { override def prepareWrite( sqlConf: SQLConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index e9f9bf8df35e6..c8bb4b2eb221e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.FileTable @@ -44,7 +44,9 @@ case class ParquetTable( ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - new ParquetWriteBuilder(paths, formatName, supportsDataType, info) + new WriteBuilder { + override def build(): Write = ParquetWrite(paths, formatName, supportsDataType, info) + } override def supportsDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala similarity index 95% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala index a4e22c21a11f3..0316d91f40732 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala @@ -27,16 +27,15 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.parquet._ -import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class ParquetWriteBuilder( +case class ParquetWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) - extends FileWriteBuilder(paths, formatName, supportsDataType, info) with Logging { + info: LogicalWriteInfo) extends FileWrite with Logging { override def prepareWrite( sqlConf: SQLConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala index 36304a9b17a1e..c09eca208b037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.text import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.v2.FileTable import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} @@ -40,7 +40,9 @@ case class TextTable( Some(StructType(Seq(StructField("value", StringType)))) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - new TextWriteBuilder(paths, formatName, supportsDataType, info) + new WriteBuilder { + override def build(): Write = TextWrite(paths, formatName, supportsDataType, info) + } override def supportsDataType(dataType: DataType): Boolean = dataType == StringType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala similarity index 92% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala index a3bf4dcae3f33..cd66f62eca956 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWrite.scala @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.execution.datasources.text.{TextOptions, TextOutputWriter} -import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder +import org.apache.spark.sql.execution.datasources.v2.FileWrite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class TextWriteBuilder( +case class TextWrite( paths: Seq[String], formatName: String, supportsDataType: DataType => Boolean, - info: LogicalWriteInfo) - extends FileWriteBuilder(paths, formatName, supportsDataType, info) { + info: LogicalWriteInfo) extends FileWrite { private def verifySchema(schema: StructType): Unit = { if (schema.size != 1) { throw new AnalysisException( From 8a4d5765de848eeceb3febac1686854b37e7829e Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Mon, 10 May 2021 16:17:08 +0800 Subject: [PATCH 2/7] temp --- .../datasources/noop/NoopDataSource.scala | 11 ++++++---- .../sql/execution/streaming/console.scala | 12 ++++++---- ...rite.scala => ConsoleStreamingWrite.scala} | 0 .../connector/SimpleWritableDataSource.scala | 22 +++++++++++-------- 4 files changed, 28 insertions(+), 17 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/{ConsoleWrite.scala => ConsoleStreamingWrite.scala} (100%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 472df286eb04f..fcf1652ea06a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.execution.datasources.noop import java.util import scala.collection.JavaConverters._ - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend} import org.apache.spark.sql.sources.DataSourceRegister @@ -55,8 +54,12 @@ private[noop] object NoopTable extends Table with SupportsWrite { private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate with SupportsStreamingUpdateAsAppend { override def truncate(): WriteBuilder = this - override def buildForBatch(): BatchWrite = NoopBatchWrite - override def buildForStreaming(): StreamingWrite = NoopStreamingWrite + override def build(): Write = NoopWrite +} + +private[noop] object NoopWrite extends Write { + override def toBatch: BatchWrite = NoopBatchWrite + override def toStreaming: StreamingWrite = NoopStreamingWrite } private[noop] object NoopBatchWrite extends BatchWrite { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index fb316598a128c..c1f5bd318382d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, WriteBuilder} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, Write, WriteBuilder} import org.apache.spark.sql.connector.write.streaming.StreamingWrite import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend} @@ -79,9 +79,13 @@ object ConsoleTable extends Table with SupportsWrite { // Do nothing for truncate. Console sink is special and it just prints all the records. override def truncate(): WriteBuilder = this - override def buildForStreaming(): StreamingWrite = { - assert(inputSchema != null) - new ConsoleWrite(inputSchema, info.options) + override def build(): Write = { + new Write { + override def toStreaming: StreamingWrite = { + assert(inputSchema != null) + new ConsoleWrite(inputSchema, info.options) + } + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala similarity index 100% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleStreamingWrite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index 065ba4caebf32..49a6742a85269 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -73,17 +73,21 @@ class SimpleWritableDataSource extends TestingV2Source { this } - override def buildForBatch(): BatchWrite = { - val hadoopPath = new Path(path) - val hadoopConf = SparkContext.getActive.get.hadoopConfiguration - val fs = hadoopPath.getFileSystem(hadoopConf) + override def build(): Write = { + new Write { + override def toBatch: BatchWrite = { + val hadoopPath = new Path(path) + val hadoopConf = SparkContext.getActive.get.hadoopConfiguration + val fs = hadoopPath.getFileSystem(hadoopConf) + + if (needTruncate) { + fs.delete(hadoopPath, true) + } - if (needTruncate) { - fs.delete(hadoopPath, true) + val pathStr = hadoopPath.toUri.toString + new MyBatchWrite(queryId, pathStr, hadoopConf) + } } - - val pathStr = hadoopPath.toUri.toString - new MyBatchWrite(queryId, pathStr, hadoopConf) } } From 30b627d0771219af09232e2383e1b22d490f37a5 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 11 May 2021 11:32:23 +0800 Subject: [PATCH 3/7] dsv2-writer --- .../sql/kafka010/KafkaSourceProvider.scala | 13 +---- .../spark/sql/kafka010/KafkaWrite.scala | 39 +++++++++++++++ .../datasources/noop/NoopDataSource.scala | 1 + .../sources/ForeachWriterTable.scala | 49 +++++++++++-------- .../execution/streaming/sources/memory.scala | 12 +++-- .../sources/StreamingDataSourceV2Suite.scala | 18 ++++--- 6 files changed, 92 insertions(+), 40 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 5c772abfe808d..0c891cd725ae3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -34,8 +34,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Tabl import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric} import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder} -import org.apache.spark.sql.connector.write.streaming.StreamingWrite +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, Write, WriteBuilder} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend} import org.apache.spark.sql.sources._ @@ -402,15 +401,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister private val producerParams = kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap)) - override def buildForBatch(): BatchWrite = { - assert(inputSchema != null) - new KafkaBatchWrite(topic, producerParams, inputSchema) - } - - override def buildForStreaming(): StreamingWrite = { - assert(inputSchema != null) - new KafkaStreamingWrite(topic, producerParams, inputSchema) - } + override def build(): Write = KafkaWrite(topic, producerParams, inputSchema) override def truncate(): WriteBuilder = this } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala new file mode 100644 index 0000000000000..d3cd081a8af95 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import org.apache.spark.sql.connector.write.{BatchWrite, Write} +import org.apache.spark.sql.connector.write.streaming.StreamingWrite +import org.apache.spark.sql.types.StructType + +case class KafkaWrite( + topic: Option[String], + producerParams: ju.Map[String, Object], + schema: StructType) extends Write { + + override def toBatch: BatchWrite = { + assert(schema != null) + new KafkaBatchWrite(topic, producerParams, schema) + } + + override def toStreaming: StreamingWrite = { + assert(schema != null) + new KafkaStreamingWrite(topic, producerParams, schema) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index fcf1652ea06a6..79e4150fe6b17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.noop import java.util import scala.collection.JavaConverters._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 55aecc0611cc4..5944b4b98e180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{DataWriter, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.python.PythonForeachWriter import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend @@ -56,31 +56,40 @@ case class ForeachWriterTable[T]( override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder with SupportsTruncate with SupportsStreamingUpdateAsAppend { - private val inputSchema: StructType = info.schema() // Do nothing for truncate. Foreach sink is special and it just forwards all the // records to ForeachWriter. override def truncate(): WriteBuilder = this - override def buildForStreaming(): StreamingWrite = { - new StreamingWrite { - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - - override def createStreamingWriterFactory( - info: PhysicalWriteInfo): StreamingDataWriterFactory = { - val rowConverter: InternalRow => T = converter match { - case Left(enc) => - val boundEnc = enc.resolveAndBind( - inputSchema.toAttributes, - SparkSession.getActiveSession.get.sessionState.analyzer) - boundEnc.createDeserializer() - case Right(func) => - func - } - ForeachWriterFactory(writer, rowConverter) - } + override def build(): Write = { + new ForeachWrite(info, writer, converter) + } + } + } +} + +class ForeachWrite[T]( + info: LogicalWriteInfo, + writer: ForeachWriter[T], + converter: Either[ExpressionEncoder[T], InternalRow => T]) extends Write { + private val inputSchema: StructType = info.schema() + override def toStreaming: StreamingWrite = { + new StreamingWrite { + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + + override def createStreamingWriterFactory( + info: PhysicalWriteInfo): StreamingDataWriterFactory = { + val rowConverter: InternalRow => T = converter match { + case Left(enc) => + val boundEnc = enc.resolveAndBind( + inputSchema.toAttributes, + SparkSession.getActiveSession.get.sessionState.analyzer) + boundEnc.createDeserializer() + case Right(func) => + func } + ForeachWriterFactory(writer, rowConverter) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala index 778cfeda68af0..4bd2b332fdb54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memory.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} -import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend import org.apache.spark.sql.types.StructType @@ -62,8 +62,8 @@ class MemorySink extends Table with SupportsWrite with Logging { this } - override def buildForStreaming(): StreamingWrite = { - new MemoryStreamingWrite(MemorySink.this, inputSchema, needTruncate) + override def build(): Write = { + new MemoryWrite(MemorySink.this, inputSchema, needTruncate) } } } @@ -130,6 +130,12 @@ class MemorySink extends Table with SupportsWrite with Logging { case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {} +class MemoryWrite(sink: MemorySink, schema: StructType, needTruncate: Boolean) extends Write { + override def toStreaming: StreamingWrite = { + new MemoryStreamingWrite(sink, schema, needTruncate) + } +} + class MemoryStreamingWrite( val sink: MemorySink, schema: StructType, needTruncate: Boolean) extends StreamingWrite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index ae0dba746d8a8..6b3fb90956ed5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -21,21 +21,20 @@ import java.util import java.util.Collections import scala.collection.JavaConverters._ - import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, MicroBatchStream, Offset, PartitionOffset} -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, PhysicalWriteInfo, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RateStreamOffset, Sink, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} -import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, StreamTest, StreamingQuery, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -68,10 +67,17 @@ class FakeScanBuilder extends ScanBuilder with Scan { override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream } -class FakeWriteBuilder extends WriteBuilder with StreamingWrite { - override def buildForStreaming(): StreamingWrite = this +class FakeWriteBuilder extends WriteBuilder { + override def build(): Write = { + new Write { + override def toStreaming: StreamingWrite = new FakeStreamingWrite + } + } +} + +class FakeStreamingWrite extends StreamingWrite { override def createStreamingWriterFactory( - info: PhysicalWriteInfo): StreamingDataWriterFactory = { + info: PhysicalWriteInfo): StreamingDataWriterFactory = { throw new IllegalStateException("fake sink - cannot actually write") } override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { From d078953d3b3b6e14b7f51ee3fd321cd892da02d5 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 11 May 2021 13:17:15 +0800 Subject: [PATCH 4/7] fix style --- .../sql/streaming/sources/StreamingDataSourceV2Suite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 6b3fb90956ed5..49dbf08a84942 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -21,6 +21,7 @@ import java.util import java.util.Collections import scala.collection.JavaConverters._ + import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -34,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RateStreamOf import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} -import org.apache.spark.sql.streaming.{OutputMode, StreamTest, StreamingQuery, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils From aa51f84e0bef9e078683b29ff489a81d5e6baf35 Mon Sep 17 00:00:00 2001 From: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com> Date: Tue, 11 May 2021 15:29:53 +0800 Subject: [PATCH 5/7] Update StreamingDataSourceV2Suite.scala --- .../sql/streaming/sources/StreamingDataSourceV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 49dbf08a84942..c360ec8e670bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -78,7 +78,7 @@ class FakeWriteBuilder extends WriteBuilder { class FakeStreamingWrite extends StreamingWrite { override def createStreamingWriterFactory( - info: PhysicalWriteInfo): StreamingDataWriterFactory = { + info: PhysicalWriteInfo): StreamingDataWriterFactory = { throw new IllegalStateException("fake sink - cannot actually write") } override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { From 41a46f5a7b4f43e69f632575c9d1981b07384721 Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Tue, 11 May 2021 16:53:53 +0800 Subject: [PATCH 6/7] trigger test From e48aee512db13fa23da27916fd7768d59811bd6f Mon Sep 17 00:00:00 2001 From: Linhong Liu Date: Thu, 13 May 2021 19:31:35 +0800 Subject: [PATCH 7/7] add description --- .../main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala | 2 ++ .../apache/spark/sql/execution/datasources/v2/FileWrite.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala index d3cd081a8af95..8e0e0516d871a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWrite.scala @@ -27,6 +27,8 @@ case class KafkaWrite( producerParams: ju.Map[String, Object], schema: StructType) extends Write { + override def description(): String = "Kafka" + override def toBatch: BatchWrite = { assert(schema != null) new KafkaBatchWrite(topic, producerParams, schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala index 427d7f644e4b2..4f736cbd89706 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala @@ -47,6 +47,8 @@ trait FileWrite extends Write { private val queryId = info.queryId() private val options = info.options() + override def description(): String = formatName + override def toBatch: BatchWrite = { val sparkSession = SparkSession.active validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)