From dcc4655225b27a4bc544ce38580949fb3fe60121 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 16 Jun 2016 20:37:44 -0700 Subject: [PATCH 1/9] Fixed and added tests --- .../apache/spark/sql/DataFrameReader.scala | 184 ++++++++++++++++-- .../sql/JavaDataFrameReaderWriterSuite.java | 144 ++++++++++++++ .../sql/test/DataFrameReaderWriterSuite.scala | 80 +++++++- 3 files changed, 381 insertions(+), 27 deletions(-) create mode 100644 sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 2ae854d04f564..0899e91f1f17d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -119,13 +119,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(): DataFrame = { - val dataSource = - DataSource( - sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap) - Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())) + load(Seq.empty: _*) // force invocation of `load(...varargs...)` } /** @@ -146,18 +140,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def load(paths: String*): DataFrame = { - if (paths.isEmpty) { - sparkSession.emptyDataFrame - } else { - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) - } + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap).resolveRelation()) } + /** * Construct a [[DataFrame]] representing the database table accessible via JDBC URL * url named table and connection properties. @@ -276,7 +267,42 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`. * - * @since 1.6.0 + * @since 1.4.0 + */ + def json(path: String): DataFrame = json(Seq(path): _*) + + /** + * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]]. + * + * This function goes through the input once to determine the input schema. If you know the + * schema in advance, use the version that specifies the schema to avoid the extra scan. + * + * You can set the following JSON-specific options to deal with non-standard JSON files: + *
  • `primitivesAsString` (default `false`): infers all primitive values as a string type
  • + *
  • `prefersDecimal` (default `false`): infers all floating-point values as a decimal + * type. If the values do not fit in decimal, then it infers them as doubles.
  • + *
  • `allowComments` (default `false`): ignores Java/C++ style comment in JSON records
  • + *
  • `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
  • + *
  • `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes + *
  • + *
  • `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers + * (e.g. 00012)
  • + *
  • `allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all + * character using backslash quoting mechanism
  • + *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records + * during parsing.
  • + * + *
  • `columnNameOfCorruptRecord` (default is the value specified in + * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string + * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • + * + * @since 2.0.0 */ @scala.annotation.varargs def json(paths: String*): DataFrame = format("json").load(paths : _*) @@ -326,6 +352,60 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { parsedOptions))(sparkSession)) } + /** + * Loads a CSV file and returns the result as a [[DataFrame]]. + * + * This function will go through the input once to determine the input schema if `inferSchema` + * is enabled. To avoid going through the entire data once, disable `inferSchema` option or + * specify the schema explicitly using [[schema]]. + * + * You can set the following CSV-specific options to deal with CSV files: + *
  • `sep` (default `,`): sets the single character as a separator for each + * field and value.
  • + *
  • `encoding` (default `UTF-8`): decodes the CSV files by the given encoding + * type.
  • + *
  • `quote` (default `"`): sets the single character used for escaping quoted values where + * the separator can be part of the value. If you would like to turn off quotations, you need to + * set not `null` but an empty string. This behaviour is different form + * `com.databricks.spark.csv`.
  • + *
  • `escape` (default `\`): sets the single character used for escaping quotes inside + * an already quoted value.
  • + *
  • `comment` (default empty string): sets the single character used for skipping lines + * beginning with this character. By default, it is disabled.
  • + *
  • `header` (default `false`): uses the first line as names of columns.
  • + *
  • `inferSchema` (default `false`): infers the input schema automatically from data. It + * requires one extra pass over the data.
  • + *
  • `ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces + * from values being read should be skipped.
  • + *
  • `ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing + * whitespaces from values being read should be skipped.
  • + *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • + *
  • `nanValue` (default `NaN`): sets the string representation of a non-number" value.
  • + *
  • `positiveInf` (default `Inf`): sets the string representation of a positive infinity + * value.
  • + *
  • `negativeInf` (default `-Inf`): sets the string representation of a negative infinity + * value.
  • + *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date + * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type + * and timestamp type. By default, it is `null` which means trying to parse times and date by + * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.
  • + *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns + * a record can have.
  • + *
  • `maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed + * for any given value being read.
  • + *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records + * during parsing.
  • + * + * + * @since 2.0.0 + */ + def csv(path: String): DataFrame = csv(Seq(path): _*) + /** * Loads a CSV file and returns the result as a [[DataFrame]]. * @@ -381,6 +461,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { @scala.annotation.varargs def csv(paths: String*): DataFrame = format("csv").load(paths : _*) + /** + * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty + * [[DataFrame]] if no paths are passed in. + * + * You can set the following Parquet-specific option(s) for reading Parquet files: + *
  • `mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets + * whether we should merge schemas collected from all Parquet part-files. This will override + * `spark.sql.parquet.mergeSchema`.
  • + * + * @since 2.0.0 + */ + def parquet(path: String): DataFrame = parquet(Seq(path): _*) + /** * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty * [[DataFrame]] if no paths are passed in. @@ -404,7 +497,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.5.0 * @note Currently, this method can only be used after enabling Hive support. */ - def orc(path: String): DataFrame = format("orc").load(path) + def orc(path: String): DataFrame = orc(Seq(path): _*) + + /** + * Loads an ORC file and returns the result as a [[DataFrame]]. + * + * @param paths input paths + * @since 2.0.0 + * @note Currently, this method can only be used after enabling Hive support. + */ + @scala.annotation.varargs + def orc(paths: String*): DataFrame = format("orc").load(paths: _*) /** * Returns the specified table as a [[DataFrame]]. @@ -430,12 +533,51 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * spark.read().text("/path/to/spark/README.md") * }}} * - * @param paths input path + * @param path input path + * @since 2.0.0 + */ + def text(path: String): DataFrame = text(Seq(path): _*) + + /** + * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named + * "value", and followed by partitioned columns if there are any. + * + * Each line in the text files is a new row in the resulting DataFrame. For example: + * {{{ + * // Scala: + * spark.read.text("/path/to/spark/README.md") + * + * // Java: + * spark.read().text("/path/to/spark/README.md") + * }}} + * + * @param paths input paths * @since 1.6.0 */ @scala.annotation.varargs def text(paths: String*): DataFrame = format("text").load(paths : _*) + /** + * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text files is a new element in the resulting Dataset. For example: + * {{{ + * // Scala: + * spark.read.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.read().textFile("/path/to/spark/README.md") + * }}} + * + * @param path input path + * @since 2.0.0 + */ + def textFile(path: String): Dataset[String] = textFile(Seq(path): _*) + /** * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset * contains a single string column named "value". diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java new file mode 100644 index 0000000000000..cf2efd0c387dc --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java @@ -0,0 +1,144 @@ +package test.org.apache.spark.sql; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.spark.api.java.function.Function0; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.test.TestSparkSession; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.Utils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class JavaDataFrameReaderWriterSuite { + private SparkSession spark = new TestSparkSession(); + private StructType schema = new StructType().add("s", "string"); + private transient String input; + private transient String output; + + @Before + public void setUp() { + input = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "input").toString(); + File f = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "output"); + f.delete(); + output = f.toString(); + } + + @After + public void tearDown() { + spark.stop(); + spark = null; + } + + @Test + public void testFormatAPI() { + spark + .read() + .format("org.apache.spark.sql.test") + .load() + .write() + .format("org.apache.spark.sql.test") + .save(); + } + + @Test + public void testOptionsAPI() { + HashMap map = new HashMap(); + map.put("e", "1"); + spark + .read() + .option("a", "1") + .option("b", 1) + .option("c", 1.0) + .option("d", true) + .options(map) + .text() + .write() + .option("a", "1") + .option("b", 1) + .option("c", 1.0) + .option("d", true) + .options(map) + .format("org.apache.spark.sql.test") + .save(); + } + + @Test + public void testSaveModeAPI() { + spark + .range(10) + .write() + .format("org.apache.spark.sql.test") + .mode(SaveMode.ErrorIfExists) + .save(); + } + + @Test + public void testLoadAPI() { + spark.read().format("org.apache.spark.sql.test").load(); + spark.read().format("org.apache.spark.sql.test").load(input); + spark.read().format("org.apache.spark.sql.test").load(input, input, input); + spark.read().format("org.apache.spark.sql.test").load(new String[]{input, input}); + } + + @Test + public void testTextAPI() { + spark.read().text(); + spark.read().text(input); + spark.read().text(input, input, input); + spark.read().text(new String[]{input, input}) + .write().text(output); + } + + @Test + public void testTextFileAPI() { + spark.read().textFile(); // Disabled because of SPARK-XXXXX + spark.read().textFile(input); + spark.read().textFile(input, input, input); + spark.read().textFile(new String[]{input, input}); + } + + @Test + public void testCsvAPI() { + spark.read().schema(schema).csv(); + spark.read().schema(schema).csv(input); + spark.read().schema(schema).csv(input, input, input); + spark.read().schema(schema).csv(new String[]{input, input}) + .write().csv(output); + } + + @Test + public void testJsonAPI() { + spark.read().schema(schema).json(); + spark.read().schema(schema).json(input); + spark.read().schema(schema).json(input, input, input); + spark.read().schema(schema).json(new String[]{input, input}) + .write().json(output); + } + + @Test + public void testParquetAPI() { + spark.read().schema(schema).parquet(); + spark.read().schema(schema).parquet(input); + spark.read().schema(schema).parquet(input, input, input); + spark.read().schema(schema).parquet(new String[] { input, input }) + .write().parquet(output); + } + + /** + * This only tests whether API compiles, but does not run it as orc() + * cannot be run with Hive classes. + */ + public void testOrcAPI() { + spark.read().schema(schema).orc(); + spark.read().schema(schema).orc(input); + spark.read().schema(schema).orc(input, input, input); + spark.read().schema(schema).orc(new String[]{input, input}) + .write().orc(output); + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 98e57b38044f2..2c14459e27e4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.test +import java.io.File + +import org.scalatest.BeforeAndAfter + import org.apache.spark.sql._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -79,10 +83,17 @@ class DefaultSource } -class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { +class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { - private def newMetadataDir = - Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + private val input = Utils.createTempDir(namePrefix = "input").getCanonicalPath + private var output: String = _ + private var schema: StructType = new StructType().add("s", "string") + + before { + val f = Utils.createTempDir(namePrefix = "output") + f.delete() + output = f.getCanonicalPath + } test("writeStream cannot be called on non-streaming datasets") { val e = intercept[AnalysisException] { @@ -160,7 +171,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { test("paths") { val df = spark.read .format("org.apache.spark.sql.test") - .option("checkpointLocation", newMetadataDir) .load("/test") assert(LastOptions.parameters("path") == "/test") @@ -169,7 +179,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { df.write .format("org.apache.spark.sql.test") - .option("checkpointLocation", newMetadataDir) .save("/test") assert(LastOptions.parameters("path") == "/test") @@ -193,7 +202,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { .option("intOpt", 56) .option("boolOpt", false) .option("doubleOpt", 6.7) - .option("checkpointLocation", newMetadataDir) .save("/test") assert(LastOptions.parameters("intOpt") == "56") @@ -228,4 +236,64 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { } } } + + test("load API") { + spark.read.format("org.apache.spark.sql.test").load() + spark.read.format("org.apache.spark.sql.test").load(input) + spark.read.format("org.apache.spark.sql.test").load(input, input, input) + spark.read.format("org.apache.spark.sql.test").load(Seq(input, input): _*) + Option(input).map(spark.read.format("org.apache.spark.sql.test").load) + } + + test("text API") { + spark.read.text() + spark.read.text(input) + spark.read.text(input, input, input) + spark.read.text(Seq(input, input): _*).write.text(output) + Option(input).map(spark.read.text) + } + + test("textFile API") { + spark.read.textFile() + spark.read.textFile(input) + spark.read.textFile(input, input, input) + spark.read.textFile(Seq(input, input): _*).write.text(output) + Option(input).map(spark.read.textFile) + } + + test("csv API") { + spark.read.schema(schema).csv() + spark.read.schema(schema).csv(input) + spark.read.schema(schema).csv(input, input, input) + spark.read.schema(schema).csv(Seq(input, input): _*).write.csv(output) + Option(input).map(spark.read.schema(schema).csv) + } + + test("json API") { + spark.read.schema(schema).json() + spark.read.schema(schema).json(input) + spark.read.schema(schema).json(input, input, input) + spark.read.schema(schema).json(Seq(input, input): _*).write.json(output) + Option(input).map(spark.read.schema(schema).json) + } + + test("parquet API") { + spark.read.schema(schema).parquet() + spark.read.schema(schema).parquet(input) + spark.read.schema(schema).parquet(input, input, input) + spark.read.schema(schema).parquet(Seq(input, input): _*).write.parquet(output) + Option(input).map(spark.read.schema(schema).parquet) + } + + /** + * This only tests whether API compiles, but does not run it as orc() + * cannot be run with Hive classes. + */ + ignore("orc API") { + spark.read.schema(schema).orc() + spark.read.schema(schema).orc(input) + spark.read.schema(schema).orc(input, input, input) + spark.read.schema(schema).orc(Seq(input, input): _*).write.orc(output) + Option(input).map(spark.read.schema(schema).orc) + } } From 960048d846ba258aab3a35027e11c377dba0830d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 16 Jun 2016 20:42:19 -0700 Subject: [PATCH 2/9] Fixed stuff --- .../sql/JavaDataFrameReaderWriterSuite.java | 20 ++++++++++++++++--- .../sql/test/DataFrameReaderWriterSuite.scala | 2 -- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java index cf2efd0c387dc..b6d1db2510e11 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java @@ -1,11 +1,25 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package test.org.apache.spark.sql; import java.io.File; -import java.util.Arrays; import java.util.HashMap; -import org.apache.spark.api.java.function.Function0; -import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.test.TestSparkSession; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 2c14459e27e4f..fda0f510fb468 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.test -import java.io.File - import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ From 3384473ec91d61405e2d6a51c94f46726bfcdbd9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 16 Jun 2016 21:44:57 -0700 Subject: [PATCH 3/9] More harmonizing --- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 16 ---------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0899e91f1f17d..76ffb70c83a2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -129,7 +129,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.4.0 */ def load(path: String): DataFrame = { - option("path", path).load() + load(Seq(path): _*) // force invocation of `load(...varargs...)` } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index fda0f510fb468..0f3d5141d2b30 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -166,22 +166,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be assert(LastOptions.saveMode === SaveMode.ErrorIfExists) } - test("paths") { - val df = spark.read - .format("org.apache.spark.sql.test") - .load("/test") - - assert(LastOptions.parameters("path") == "/test") - - LastOptions.clear() - - df.write - .format("org.apache.spark.sql.test") - .save("/test") - - assert(LastOptions.parameters("path") == "/test") - } - test("test different data types for options") { val df = spark.read .format("org.apache.spark.sql.test") From 3150b013dd67e98dfa80ad50ee0fa7cbcc2a7486 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 17 Jun 2016 01:30:25 -0700 Subject: [PATCH 4/9] Updated tests to check schema as well --- .../sql/test/DataFrameReaderWriterSuite.scala | 113 ++++++++++++------ 1 file changed, 76 insertions(+), 37 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 0f3d5141d2b30..9519efdf8a6f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -83,9 +83,11 @@ class DefaultSource class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { + private val input = Utils.createTempDir(namePrefix = "input").getCanonicalPath private var output: String = _ - private var schema: StructType = new StructType().add("s", "string") + private val userSchema = new StructType().add("s", StringType) + private val textSchema = new StructType().add("value", StringType) before { val f = Utils.createTempDir(namePrefix = "output") @@ -227,55 +229,92 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be Option(input).map(spark.read.format("org.apache.spark.sql.test").load) } - test("text API") { - spark.read.text() - spark.read.text(input) - spark.read.text(input, input, input) - spark.read.text(Seq(input, input): _*).write.text(output) - Option(input).map(spark.read.text) + test("text - API and common behavior") { + // Reader, without user specified schema + assert(spark.read.text().schema === textSchema) + assert(spark.read.text(input).schema === textSchema) + assert(spark.read.text(input, input, input).schema === textSchema) + assert(spark.read.text(Seq(input, input): _*).schema === textSchema) + assert(Option(input).map(spark.read.text).get.schema === textSchema) // SPARK-16009 + + // Reader, with user specified schema + assert(spark.read.schema(userSchema).text().schema === userSchema) + assert(spark.read.schema(userSchema).text(input).schema === userSchema) + assert(spark.read.schema(userSchema).text(input, input, input).schema === userSchema) + assert(spark.read.schema(userSchema).text(Seq(input, input): _*).schema === userSchema) + + // Writer + spark.read.text().write.text(output) } - test("textFile API") { - spark.read.textFile() - spark.read.textFile(input) - spark.read.textFile(input, input, input) - spark.read.textFile(Seq(input, input): _*).write.text(output) - Option(input).map(spark.read.textFile) + test("textFile - API and common behavior") { + // Reader, without user specified schema + assert(spark.read.textFile().schema === textSchema) + assert(spark.read.textFile(input).schema === textSchema) + assert(spark.read.textFile(input, input, input).schema === textSchema) + assert(spark.read.textFile(Seq(input, input): _*).schema === textSchema) + assert(Option(input).map(spark.read.textFile).get.schema === textSchema) // SPARK-16009 } - test("csv API") { - spark.read.schema(schema).csv() - spark.read.schema(schema).csv(input) - spark.read.schema(schema).csv(input, input, input) - spark.read.schema(schema).csv(Seq(input, input): _*).write.csv(output) - Option(input).map(spark.read.schema(schema).csv) + test("csv - API and common behavior") { + // Reader, with user specified schema + // Refer to csv-specific test suites for behavior without user specified schema + assert(spark.read.schema(userSchema).csv().schema === userSchema) + assert(spark.read.schema(userSchema).csv(input).schema === userSchema) + assert(spark.read.schema(userSchema).csv(input, input, input).schema === userSchema) + assert(spark.read.schema(userSchema).csv(Seq(input, input): _*).schema === userSchema) + + // Test explicit calls to single arg method - SPARK-16009 + assert(Option(input).map(spark.read.schema(userSchema).csv).get.schema === userSchema) + + // Writer + spark.range(10).write.csv(output) } - test("json API") { - spark.read.schema(schema).json() - spark.read.schema(schema).json(input) - spark.read.schema(schema).json(input, input, input) - spark.read.schema(schema).json(Seq(input, input): _*).write.json(output) - Option(input).map(spark.read.schema(schema).json) + test("json - API and common behavior") { + // Reader, with user specified schema + // Refer to csv-specific test suites for behavior without user specified schema + assert(spark.read.schema(userSchema).json().schema === userSchema) + assert(spark.read.schema(userSchema).json(input).schema === userSchema) + assert(spark.read.schema(userSchema).json(input, input, input).schema === userSchema) + assert(spark.read.schema(userSchema).json(Seq(input, input): _*).schema === userSchema) + + // Test explicit calls to single arg method - SPARK-16009 + assert(Option(input).map(spark.read.schema(userSchema).json).get.schema === userSchema) + + // Writer + spark.range(10).write.json(output) } - test("parquet API") { - spark.read.schema(schema).parquet() - spark.read.schema(schema).parquet(input) - spark.read.schema(schema).parquet(input, input, input) - spark.read.schema(schema).parquet(Seq(input, input): _*).write.parquet(output) - Option(input).map(spark.read.schema(schema).parquet) + test("parquet - API and common behavior") { + // Reader, with user specified schema + // Refer to csv-specific test suites for behavior without user specified schema + assert(spark.read.schema(userSchema).parquet().schema === userSchema) + assert(spark.read.schema(userSchema).parquet(input).schema === userSchema) + assert(spark.read.schema(userSchema).parquet(input, input, input).schema === userSchema) + assert(spark.read.schema(userSchema).parquet(Seq(input, input): _*).schema === userSchema) + + // Test explicit calls to single arg method - SPARK-16009 + assert(Option(input).map(spark.read.schema(userSchema).parquet).get.schema === userSchema) + + // Writer + spark.range(10).write.parquet(output) } /** * This only tests whether API compiles, but does not run it as orc() * cannot be run with Hive classes. */ - ignore("orc API") { - spark.read.schema(schema).orc() - spark.read.schema(schema).orc(input) - spark.read.schema(schema).orc(input, input, input) - spark.read.schema(schema).orc(Seq(input, input): _*).write.orc(output) - Option(input).map(spark.read.schema(schema).orc) + ignore("orc - API") { + // Reader, with user specified schema + // Refer to csv-specific test suites for behavior without user specified schema + spark.read.schema(userSchema).orc() + spark.read.schema(userSchema).orc(input) + spark.read.schema(userSchema).orc(input, input, input) + spark.read.schema(userSchema).orc(Seq(input, input): _*) + Option(input).map(spark.read.schema(userSchema).orc) + + // Writer + spark.range(10).write.orc(output) } } From bb52410a3df6f16fd51534cad77fb9366c8d2712 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 17 Jun 2016 01:40:10 -0700 Subject: [PATCH 5/9] Addressed comments --- .../apache/spark/sql/DataFrameReader.scala | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 76ffb70c83a2b..798cc2964a205 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -269,7 +269,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * @since 1.4.0 */ - def json(path: String): DataFrame = json(Seq(path): _*) + def json(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + json(Seq(path): _*) + } /** * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]]. @@ -404,7 +407,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * @since 2.0.0 */ - def csv(path: String): DataFrame = csv(Seq(path): _*) + def csv(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + csv(Seq(path): _*) + } /** * Loads a CSV file and returns the result as a [[DataFrame]]. @@ -472,7 +478,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * @since 2.0.0 */ - def parquet(path: String): DataFrame = parquet(Seq(path): _*) + def parquet(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + parquet(Seq(path): _*) + } /** * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty @@ -497,7 +506,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @since 1.5.0 * @note Currently, this method can only be used after enabling Hive support. */ - def orc(path: String): DataFrame = orc(Seq(path): _*) + def orc(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + orc(Seq(path): _*) + } /** * Loads an ORC file and returns the result as a [[DataFrame]]. @@ -536,7 +548,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @param path input path * @since 2.0.0 */ - def text(path: String): DataFrame = text(Seq(path): _*) + def text(path: String): DataFrame = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + text(Seq(path): _*) + } /** * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named @@ -576,7 +591,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * @param path input path * @since 2.0.0 */ - def textFile(path: String): Dataset[String] = textFile(Seq(path): _*) + def textFile(path: String): Dataset[String] = { + // This method ensures that calls that explicit need single argument works, see SPARK-16009 + textFile(Seq(path): _*) + } /** * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset From 29524b1201fb3e028a2875397feba5c0e577365f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 17 Jun 2016 18:37:23 -0700 Subject: [PATCH 6/9] Updated tests to make them handle no-schema case --- .../apache/spark/sql/DataFrameReader.scala | 3 + .../sql/test/DataFrameReaderWriterSuite.scala | 185 +++++++++++------- 2 files changed, 122 insertions(+), 66 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 798cc2964a205..adff22cb14092 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -617,6 +617,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ @scala.annotation.varargs def textFile(paths: String*): Dataset[String] = { + if (userSpecifiedSchema.nonEmpty) { + throw new AnalysisException("User specified schema not supported with `textFile`") + } text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 9519efdf8a6f5..3b136cd67a8c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.test +import java.io.File + import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ @@ -84,15 +86,15 @@ class DefaultSource class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { - private val input = Utils.createTempDir(namePrefix = "input").getCanonicalPath - private var output: String = _ private val userSchema = new StructType().add("s", StringType) private val textSchema = new StructType().add("value", StringType) + private val data = Seq("1", "2", "3") + private val dir = Utils.createTempDir(namePrefix = "input").getCanonicalPath + private implicit var enc: Encoder[String] = _ before { - val f = Utils.createTempDir(namePrefix = "output") - f.delete() - output = f.getCanonicalPath + enc = spark.implicits.newStringEncoder + Utils.deleteRecursively(new File(dir)) } test("writeStream cannot be called on non-streaming datasets") { @@ -223,82 +225,125 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be test("load API") { spark.read.format("org.apache.spark.sql.test").load() - spark.read.format("org.apache.spark.sql.test").load(input) - spark.read.format("org.apache.spark.sql.test").load(input, input, input) - spark.read.format("org.apache.spark.sql.test").load(Seq(input, input): _*) - Option(input).map(spark.read.format("org.apache.spark.sql.test").load) + spark.read.format("org.apache.spark.sql.test").load(dir) + spark.read.format("org.apache.spark.sql.test").load(dir, dir, dir) + spark.read.format("org.apache.spark.sql.test").load(Seq(dir, dir): _*) + Option(dir).map(spark.read.format("org.apache.spark.sql.test").load) } - test("text - API and common behavior") { - // Reader, without user specified schema - assert(spark.read.text().schema === textSchema) - assert(spark.read.text(input).schema === textSchema) - assert(spark.read.text(input, input, input).schema === textSchema) - assert(spark.read.text(Seq(input, input): _*).schema === textSchema) - assert(Option(input).map(spark.read.text).get.schema === textSchema) // SPARK-16009 - - // Reader, with user specified schema - assert(spark.read.schema(userSchema).text().schema === userSchema) - assert(spark.read.schema(userSchema).text(input).schema === userSchema) - assert(spark.read.schema(userSchema).text(input, input, input).schema === userSchema) - assert(spark.read.schema(userSchema).text(Seq(input, input): _*).schema === userSchema) - + test("text - API and behavior regarding schema") { // Writer - spark.read.text().write.text(output) - } + spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir) + testRead(spark.read.text(dir), data, textSchema) - test("textFile - API and common behavior") { // Reader, without user specified schema - assert(spark.read.textFile().schema === textSchema) - assert(spark.read.textFile(input).schema === textSchema) - assert(spark.read.textFile(input, input, input).schema === textSchema) - assert(spark.read.textFile(Seq(input, input): _*).schema === textSchema) - assert(Option(input).map(spark.read.textFile).get.schema === textSchema) // SPARK-16009 + testRead(spark.read.text(), Seq.empty, textSchema) + testRead(spark.read.text(dir, dir, dir), data ++ data ++ data, textSchema) + testRead(spark.read.text(Seq(dir, dir): _*), data ++ data, textSchema) + // Test explicit calls to single arg method - SPARK-16009 + testRead(Option(dir).map(spark.read.text).get, data, textSchema) + + // Reader, with user specified schema, should just apply user schema on the file data + testRead(spark.read.schema(userSchema).text(), Seq.empty, userSchema) + testRead(spark.read.schema(userSchema).text(dir), data, userSchema) + testRead(spark.read.schema(userSchema).text(dir, dir), data ++ data, userSchema) + testRead(spark.read.schema(userSchema).text(Seq(dir, dir): _*), data ++ data, userSchema) } - test("csv - API and common behavior") { - // Reader, with user specified schema - // Refer to csv-specific test suites for behavior without user specified schema - assert(spark.read.schema(userSchema).csv().schema === userSchema) - assert(spark.read.schema(userSchema).csv(input).schema === userSchema) - assert(spark.read.schema(userSchema).csv(input, input, input).schema === userSchema) - assert(spark.read.schema(userSchema).csv(Seq(input, input): _*).schema === userSchema) + test("textFile - API and behavior regarding schema") { + spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir) + // Reader, without user specified schema + testRead(spark.read.textFile().toDF(), Seq.empty, textSchema) + testRead(spark.read.textFile(dir).toDF(), data, textSchema) + testRead(spark.read.textFile(dir, dir).toDF(), data ++ data, textSchema) + testRead(spark.read.textFile(Seq(dir, dir): _*).toDF(), data ++ data, textSchema) // Test explicit calls to single arg method - SPARK-16009 - assert(Option(input).map(spark.read.schema(userSchema).csv).get.schema === userSchema) - - // Writer - spark.range(10).write.csv(output) + testRead(Option(dir).map(spark.read.text).get, data, textSchema) + + // Reader, with user specified schema, should just apply user schema on the file data + val e = intercept[AnalysisException] { spark.read.schema(userSchema).textFile() } + assert(e.getMessage.toLowerCase.contains("user specified schema not supported")) + intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir) } + intercept[AnalysisException] { spark.read.schema(userSchema).textFile(dir, dir) } + intercept[AnalysisException] { spark.read.schema(userSchema).textFile(Seq(dir, dir): _*) } } - test("json - API and common behavior") { - // Reader, with user specified schema - // Refer to csv-specific test suites for behavior without user specified schema - assert(spark.read.schema(userSchema).json().schema === userSchema) - assert(spark.read.schema(userSchema).json(input).schema === userSchema) - assert(spark.read.schema(userSchema).json(input, input, input).schema === userSchema) - assert(spark.read.schema(userSchema).json(Seq(input, input): _*).schema === userSchema) + test("csv - API and behavior regarding schema") { + // Writer + spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).csv(dir) + val df = spark.read.csv(dir) + checkAnswer(df, spark.createDataset(data).toDF()) + val schema = df.schema + // Reader, without user specified schema + intercept[IllegalArgumentException] { + testRead(spark.read.csv(), Seq.empty, schema) + } + testRead(spark.read.csv(dir), data, schema) + testRead(spark.read.csv(dir, dir), data ++ data, schema) + testRead(spark.read.csv(Seq(dir, dir): _*), data ++ data, schema) // Test explicit calls to single arg method - SPARK-16009 - assert(Option(input).map(spark.read.schema(userSchema).json).get.schema === userSchema) + testRead(Option(dir).map(spark.read.csv).get, data, schema) - // Writer - spark.range(10).write.json(output) + // Reader, with user specified schema, should just apply user schema on the file data + testRead(spark.read.schema(userSchema).csv(), Seq.empty, userSchema) + testRead(spark.read.schema(userSchema).csv(dir), data, userSchema) + testRead(spark.read.schema(userSchema).csv(dir, dir), data ++ data, userSchema) + testRead(spark.read.schema(userSchema).csv(Seq(dir, dir): _*), data ++ data, userSchema) } - test("parquet - API and common behavior") { - // Reader, with user specified schema - // Refer to csv-specific test suites for behavior without user specified schema - assert(spark.read.schema(userSchema).parquet().schema === userSchema) - assert(spark.read.schema(userSchema).parquet(input).schema === userSchema) - assert(spark.read.schema(userSchema).parquet(input, input, input).schema === userSchema) - assert(spark.read.schema(userSchema).parquet(Seq(input, input): _*).schema === userSchema) + test("json - API and behavior regarding schema") { + // Writer + spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).json(dir) + val df = spark.read.json(dir) + checkAnswer(df, spark.createDataset(data).toDF()) + val schema = df.schema + // Reader, without user specified schema + intercept[AnalysisException] { + testRead(spark.read.json(), Seq.empty, schema) + } + testRead(spark.read.json(dir), data, schema) + testRead(spark.read.json(dir, dir), data ++ data, schema) + testRead(spark.read.json(Seq(dir, dir): _*), data ++ data, schema) // Test explicit calls to single arg method - SPARK-16009 - assert(Option(input).map(spark.read.schema(userSchema).parquet).get.schema === userSchema) + testRead(Option(dir).map(spark.read.json).get, data, schema) + + // Reader, with user specified schema, data should be nulls as schema in file different + // from user schema + val expData = Seq[String](null, null, null) + testRead(spark.read.schema(userSchema).json(), Seq.empty, userSchema) + testRead(spark.read.schema(userSchema).json(dir), expData, userSchema) + testRead(spark.read.schema(userSchema).json(dir, dir), expData ++ expData, userSchema) + testRead(spark.read.schema(userSchema).json(Seq(dir, dir): _*), expData ++ expData, userSchema) + } + test("parquet - API and behavior regarding schema") { // Writer - spark.range(10).write.parquet(output) + spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).parquet(dir) + val df = spark.read.parquet(dir) + checkAnswer(df, spark.createDataset(data).toDF()) + val schema = df.schema + + // Reader, without user specified schema + intercept[AnalysisException] { + testRead(spark.read.parquet(), Seq.empty, schema) + } + testRead(spark.read.parquet(dir), data, schema) + testRead(spark.read.parquet(dir, dir), data ++ data, schema) + testRead(spark.read.parquet(Seq(dir, dir): _*), data ++ data, schema) + // Test explicit calls to single arg method - SPARK-16009 + testRead(Option(dir).map(spark.read.parquet).get, data, schema) + + // Reader, with user specified schema, data should be nulls as schema in file different + // from user schema + val expData = Seq[String](null, null, null) + testRead(spark.read.schema(userSchema).parquet(), Seq.empty, userSchema) + testRead(spark.read.schema(userSchema).parquet(dir), expData, userSchema) + testRead(spark.read.schema(userSchema).parquet(dir, dir), expData ++ expData, userSchema) + testRead( + spark.read.schema(userSchema).parquet(Seq(dir, dir): _*), expData ++ expData, userSchema) } /** @@ -309,12 +354,20 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be // Reader, with user specified schema // Refer to csv-specific test suites for behavior without user specified schema spark.read.schema(userSchema).orc() - spark.read.schema(userSchema).orc(input) - spark.read.schema(userSchema).orc(input, input, input) - spark.read.schema(userSchema).orc(Seq(input, input): _*) - Option(input).map(spark.read.schema(userSchema).orc) + spark.read.schema(userSchema).orc(dir) + spark.read.schema(userSchema).orc(dir, dir, dir) + spark.read.schema(userSchema).orc(Seq(dir, dir): _*) + Option(dir).map(spark.read.schema(userSchema).orc) // Writer - spark.range(10).write.orc(output) + spark.range(10).write.orc(dir) + } + + private def testRead( + df: => DataFrame, + expectedResult: Seq[String], + expectedSchema: StructType): Unit = { + checkAnswer(df, spark.createDataset(expectedResult).toDF()) + assert(df.schema === expectedSchema) } } From 24174f08587d0fa680c9050f5648a0b090507af6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 20 Jun 2016 03:49:43 -0700 Subject: [PATCH 7/9] Fixed docs --- .../apache/spark/sql/DataFrameReader.scala | 131 +++--------------- .../sql/JavaDataFrameReaderWriterSuite.java | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 2 +- 3 files changed, 18 insertions(+), 117 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index adff22cb14092..28367017f1bc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -238,34 +238,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]]. - * - * This function goes through the input once to determine the input schema. If you know the - * schema in advance, use the version that specifies the schema to avoid the extra scan. - * - * You can set the following JSON-specific options to deal with non-standard JSON files: - *
  • `primitivesAsString` (default `false`): infers all primitive values as a string type
  • - *
  • `prefersDecimal` (default `false`): infers all floating-point values as a decimal - * type. If the values do not fit in decimal, then it infers them as doubles.
  • - *
  • `allowComments` (default `false`): ignores Java/C++ style comment in JSON records
  • - *
  • `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
  • - *
  • `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes - *
  • - *
  • `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers - * (e.g. 00012)
  • - *
  • `allowBackslashEscapingAnyCharacter` (default `false`): allows accepting quoting of all - * character using backslash quoting mechanism
  • - *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing.
  • - *
      - *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the - * malformed string into a new field configured by `columnNameOfCorruptRecord`. When - * a schema is set by user, it sets `null` for extra fields.
    • - *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • - *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • - *
    - *
  • `columnNameOfCorruptRecord` (default is the value specified in - * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string - * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
  • + * See the documentation on the overloaded `json()` method with varargs for more details. * * @since 1.4.0 */ @@ -281,6 +254,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * schema in advance, use the version that specifies the schema to avoid the extra scan. * * You can set the following JSON-specific options to deal with non-standard JSON files: + *
      *
    • `primitivesAsString` (default `false`): infers all primitive values as a string type
    • *
    • `prefersDecimal` (default `false`): infers all floating-point values as a decimal * type. If the values do not fit in decimal, then it infers them as doubles.
    • @@ -304,7 +278,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
    • `columnNameOfCorruptRecord` (default is the value specified in * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string * created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.
    • - * + *
    * @since 2.0.0 */ @scala.annotation.varargs @@ -356,54 +330,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** - * Loads a CSV file and returns the result as a [[DataFrame]]. - * - * This function will go through the input once to determine the input schema if `inferSchema` - * is enabled. To avoid going through the entire data once, disable `inferSchema` option or - * specify the schema explicitly using [[schema]]. - * - * You can set the following CSV-specific options to deal with CSV files: - *
  • `sep` (default `,`): sets the single character as a separator for each - * field and value.
  • - *
  • `encoding` (default `UTF-8`): decodes the CSV files by the given encoding - * type.
  • - *
  • `quote` (default `"`): sets the single character used for escaping quoted values where - * the separator can be part of the value. If you would like to turn off quotations, you need to - * set not `null` but an empty string. This behaviour is different form - * `com.databricks.spark.csv`.
  • - *
  • `escape` (default `\`): sets the single character used for escaping quotes inside - * an already quoted value.
  • - *
  • `comment` (default empty string): sets the single character used for skipping lines - * beginning with this character. By default, it is disabled.
  • - *
  • `header` (default `false`): uses the first line as names of columns.
  • - *
  • `inferSchema` (default `false`): infers the input schema automatically from data. It - * requires one extra pass over the data.
  • - *
  • `ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces - * from values being read should be skipped.
  • - *
  • `ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing - * whitespaces from values being read should be skipped.
  • - *
  • `nullValue` (default empty string): sets the string representation of a null value.
  • - *
  • `nanValue` (default `NaN`): sets the string representation of a non-number" value.
  • - *
  • `positiveInf` (default `Inf`): sets the string representation of a positive infinity - * value.
  • - *
  • `negativeInf` (default `-Inf`): sets the string representation of a negative infinity - * value.
  • - *
  • `dateFormat` (default `null`): sets the string that indicates a date format. Custom date - * formats follow the formats at `java.text.SimpleDateFormat`. This applies to both date type - * and timestamp type. By default, it is `null` which means trying to parse times and date by - * `java.sql.Timestamp.valueOf()` and `java.sql.Date.valueOf()`.
  • - *
  • `maxColumns` (default `20480`): defines a hard limit of how many columns - * a record can have.
  • - *
  • `maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed - * for any given value being read.
  • - *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing.
  • - *
      - *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When - * a schema is set by user, it sets `null` for extra fields.
    • - *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • - *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • - *
    + * Loads a CSV file and returns the result as a [[DataFrame]]. See the documentation on the + * other overloaded `csv()` method for more details. * * @since 2.0.0 */ @@ -420,6 +348,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * specify the schema explicitly using [[schema]]. * * You can set the following CSV-specific options to deal with CSV files: + *
      *
    • `sep` (default `,`): sets the single character as a separator for each * field and value.
    • *
    • `encoding` (default `UTF-8`): decodes the CSV files by the given encoding @@ -461,20 +390,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    - * + * * @since 2.0.0 */ @scala.annotation.varargs def csv(paths: String*): DataFrame = format("csv").load(paths : _*) /** - * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty - * [[DataFrame]] if no paths are passed in. - * - * You can set the following Parquet-specific option(s) for reading Parquet files: - *
  • `mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets - * whether we should merge schemas collected from all Parquet part-files. This will override - * `spark.sql.parquet.mergeSchema`.
  • + * Loads a Parquet file, returning the result as a [[DataFrame]]. See the documentation + * on the other overloaded `parquet()` method for more details. * * @since 2.0.0 */ @@ -484,14 +408,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** - * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty - * [[DataFrame]] if no paths are passed in. + * Loads a Parquet file, returning the result as a [[DataFrame]]. * * You can set the following Parquet-specific option(s) for reading Parquet files: + *
      *
    • `mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets * whether we should merge schemas collected from all Parquet part-files. This will override * `spark.sql.parquet.mergeSchema`.
    • - * + *
    * @since 1.4.0 */ @scala.annotation.varargs @@ -534,18 +458,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Loads text files and returns a [[DataFrame]] whose schema starts with a string column named - * "value", and followed by partitioned columns if there are any. - * - * Each line in the text files is a new row in the resulting DataFrame. For example: - * {{{ - * // Scala: - * spark.read.text("/path/to/spark/README.md") + * "value", and followed by partitioned columns if there are any. See the documentation on + * the other overloaded `text()` method for more details. * - * // Java: - * spark.read().text("/path/to/spark/README.md") - * }}} - * - * @param path input path * @since 2.0.0 */ def text(path: String): DataFrame = { @@ -573,22 +488,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { def text(paths: String*): DataFrame = format("text").load(paths : _*) /** - * Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset - * contains a single string column named "value". - * - * If the directory structure of the text files contains partitioning information, those are - * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. - * - * Each line in the text files is a new element in the resulting Dataset. For example: - * {{{ - * // Scala: - * spark.read.textFile("/path/to/spark/README.md") - * - * // Java: - * spark.read().textFile("/path/to/spark/README.md") - * }}} - * - * @param path input path + * Loads text files and returns a [[Dataset]] of String. See the documentation on the + * other overloaded `textFile()` method for more details. * @since 2.0.0 */ def textFile(path: String): Dataset[String] = { diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java index b6d1db2510e11..1d16a73f3b6c7 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java @@ -146,7 +146,7 @@ public void testParquetAPI() { /** * This only tests whether API compiles, but does not run it as orc() - * cannot be run with Hive classes. + * cannot be run without Hive classes. */ public void testOrcAPI() { spark.read().schema(schema).orc(); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 3b136cd67a8c1..3fa3864bc9690 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -348,7 +348,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be /** * This only tests whether API compiles, but does not run it as orc() - * cannot be run with Hive classes. + * cannot be run without Hive classes. */ ignore("orc - API") { // Reader, with user specified schema From 3498bd06dda12f3bf8788b787195cd8293f6ebde Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 20 Jun 2016 03:53:00 -0700 Subject: [PATCH 8/9] One more fix --- .../org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java index 1d16a73f3b6c7..7babf7573c075 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameReaderWriterSuite.java @@ -111,7 +111,7 @@ public void testTextAPI() { @Test public void testTextFileAPI() { - spark.read().textFile(); // Disabled because of SPARK-XXXXX + spark.read().textFile(); spark.read().textFile(input); spark.read().textFile(input, input, input); spark.read().textFile(new String[]{input, input}); From 2539a947d382d8d6c21c59fe8a9420a15aad9b9a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 20 Jun 2016 05:10:21 -0700 Subject: [PATCH 9/9] Better indenting of bulleted sublist --- .../org/apache/spark/sql/DataFrameReader.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 28367017f1bc8..841503b260c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -269,11 +269,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing.
  • *
      - *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the - * malformed string into a new field configured by `columnNameOfCorruptRecord`. When + *
    • - `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts + * the malformed string into a new field configured by `columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields.
    • - *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • - *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • + *
    • - `DROPMALFORMED` : ignores the whole corrupted records.
    • + *
    • - `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    *
  • `columnNameOfCorruptRecord` (default is the value specified in * `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string @@ -385,10 +385,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing.
  • *
      - *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When + *
    • - `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record. When * a schema is set by user, it sets `null` for extra fields.
    • - *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • - *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • + *
    • - `DROPMALFORMED` : ignores the whole corrupted records.
    • + *
    • - `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    * * @since 2.0.0