Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -43,7 +43,9 @@ case class AvroTable(
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new AvroWriteBuilder(paths, formatName, supportsDataType, info)
new WriteBuilder {
override def build(): Write = AvroWrite(paths, formatName, supportsDataType, info)
}

override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class AvroWriteBuilder(
case class AvroWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
info: LogicalWriteInfo) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Tabl
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, Write, WriteBuilder}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -402,15 +401,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
private val producerParams =
kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap))

override def buildForBatch(): BatchWrite = {
assert(inputSchema != null)
new KafkaBatchWrite(topic, producerParams, inputSchema)
}

override def buildForStreaming(): StreamingWrite = {
assert(inputSchema != null)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
override def build(): Write = KafkaWrite(topic, producerParams, inputSchema)

override def truncate(): WriteBuilder = this
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.kafka010

import java.{util => ju}

import org.apache.spark.sql.connector.write.{BatchWrite, Write}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.types.StructType

case class KafkaWrite(
Comment thread
cloud-fan marked this conversation as resolved.
topic: Option[String],
producerParams: ju.Map[String, Object],
schema: StructType) extends Write {

override def description(): String = "Kafka"

override def toBatch: BatchWrite = {
assert(schema != null)
new KafkaBatchWrite(topic, producerParams, schema)
}

override def toStreaming: StreamingWrite = {
assert(schema != null)
new KafkaStreamingWrite(topic, producerParams, schema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, Write, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
import org.apache.spark.sql.sources.DataSourceRegister
Expand Down Expand Up @@ -55,8 +55,12 @@ private[noop] object NoopTable extends Table with SupportsWrite {
private[noop] object NoopWriteBuilder extends WriteBuilder
with SupportsTruncate with SupportsStreamingUpdateAsAppend {
override def truncate(): WriteBuilder = this
override def buildForBatch(): BatchWrite = NoopBatchWrite
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
override def build(): Write = NoopWrite
}

private[noop] object NoopWrite extends Write {
override def toBatch: BatchWrite = NoopBatchWrite
override def toStreaming: StreamingWrite = NoopStreamingWrite
}

private[noop] object NoopBatchWrite extends BatchWrite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,27 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, Write}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.SerializableConfiguration

abstract class FileWriteBuilder(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends WriteBuilder {
trait FileWrite extends Write {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

def paths: Seq[String]
def formatName: String
def supportsDataType: DataType => Boolean
def info: LogicalWriteInfo

private val schema = info.schema()
private val queryId = info.queryId()
private val options = info.options()

override def buildForBatch(): BatchWrite = {
override def description(): String = formatName

override def toBatch: BatchWrite = {
val sparkSession = SparkSession.active
validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis)
val path = new Path(paths.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.FileTable
Expand Down Expand Up @@ -50,7 +50,9 @@ case class CSVTable(
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new CSVWriteBuilder(paths, formatName, supportsDataType, info)
new WriteBuilder {
override def build(): Write = CSVWrite(paths, formatName, supportsDataType, info)
}

override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}

class CSVWriteBuilder(
case class CSVWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
info: LogicalWriteInfo) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.json.JsonDataSource
import org.apache.spark.sql.execution.datasources.v2.FileTable
Expand Down Expand Up @@ -50,7 +50,9 @@ case class JsonTable(
}

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new JsonWriteBuilder(paths, formatName, supportsDataType, info)
new WriteBuilder {
override def build(): Write = JsonWrite(paths, formatName, supportsDataType, info)
}

override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.json.JsonOutputWriter
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class JsonWriteBuilder(
case class JsonWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
info: LogicalWriteInfo) extends FileWrite {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
Expand All @@ -44,7 +44,9 @@ case class OrcTable(
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new OrcWriteBuilder(paths, formatName, supportsDataType, info)
new WriteBuilder {
override def build(): Write = OrcWrite(paths, formatName, supportsDataType, info)
}

override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import org.apache.orc.mapred.OrcStruct
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils}
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class OrcWriteBuilder(
case class OrcWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
info: LogicalWriteInfo) extends FileWrite {

override def prepareWrite(
sqlConf: SQLConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
Expand All @@ -44,7 +44,9 @@ case class ParquetTable(
ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new ParquetWriteBuilder(paths, formatName, supportsDataType, info)
new WriteBuilder {
override def build(): Write = ParquetWrite(paths, formatName, supportsDataType, info)
}

override def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class ParquetWriteBuilder(
case class ParquetWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) with Logging {
info: LogicalWriteInfo) extends FileWrite with Logging {

override def prepareWrite(
sqlConf: SQLConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.text
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, Write, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
Expand All @@ -40,7 +40,9 @@ case class TextTable(
Some(StructType(Seq(StructField("value", StringType))))

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new TextWriteBuilder(paths, formatName, supportsDataType, info)
new WriteBuilder {
override def build(): Write = TextWrite(paths, formatName, supportsDataType, info)
}

override def supportsDataType(dataType: DataType): Boolean = dataType == StringType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.text.{TextOptions, TextOutputWriter}
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.execution.datasources.v2.FileWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class TextWriteBuilder(
case class TextWrite(
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
info: LogicalWriteInfo) extends FileWrite {
private def verifySchema(schema: StructType): Unit = {
if (schema.size != 1) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsTruncate, Write, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
Expand Down Expand Up @@ -79,9 +79,13 @@ object ConsoleTable extends Table with SupportsWrite {
// Do nothing for truncate. Console sink is special and it just prints all the records.
override def truncate(): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
assert(inputSchema != null)
new ConsoleWrite(inputSchema, info.options)
override def build(): Write = {
new Write {
override def toStreaming: StreamingWrite = {
assert(inputSchema != null)
new ConsoleWrite(inputSchema, info.options)
}
}
}
}
}
Expand Down
Loading