Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d002cd
add delete support for index refresh
Aug 26, 2020
f8f72f2
Merge branch 'master' into pouriap/refreshDelete
Aug 26, 2020
aed6255
Merge branch 'master' into pouriap/refreshDelete
Aug 31, 2020
bf98425
add delete support to refresh index
Sep 2, 2020
fcb4f95
add delete support to refresh index
Sep 2, 2020
ba55f30
add delete support to refresh index
Sep 2, 2020
a15b71d
add delete support to refresh index
Sep 2, 2020
70d9416
fix index manager test case
Sep 3, 2020
0b26209
fix index content refresh
Sep 3, 2020
3cc1e0f
Merge branch 'master' into pouriap/refreshDelete
Sep 3, 2020
bfa0543
Merge branch 'master' into pouriap/refreshDelete
Sep 9, 2020
66fa8fb
fix merge conflicts
Sep 10, 2020
2a62d1d
refactor refresh code and add refresh delete
Sep 10, 2020
9916c25
check for lineage during refresh delete.
Sep 10, 2020
d808a13
fix refresh delete test
Sep 10, 2020
38623a0
Merge branch 'master' into pouriap/refreshDelete
Sep 14, 2020
96b10bb
Code clean-up for refresh delete
Sep 14, 2020
6bf5990
Changes in refresh delete code and test
Sep 15, 2020
25bc63c
Merge branch 'master' into pouriap/refreshDelete
Sep 15, 2020
d0bda6a
index refresh delete code changes
Sep 16, 2020
8da34c6
fix index refresh delete test
Sep 16, 2020
33c2080
fix index refresh delete
Sep 16, 2020
7ca9f8c
Merge branch 'master' into pouriap/refreshDelete
Sep 16, 2020
46aa2a7
add tests to refresh delete and code clean-up
Sep 17, 2020
ed88a36
changes to refresh delete and its tests
Sep 17, 2020
c98b30b
Merge branch 'master' into pouriap/refreshDelete
Sep 17, 2020
5d08fc6
Change validation for index refresh delete
Sep 17, 2020
0499b45
add file info check to refresh delete action
Sep 18, 2020
7137b89
misc changes in index refresh delete and its tests.
Sep 18, 2020
7be558b
misc fix in refresh index tests.
Sep 18, 2020
7974b9a
fix in test code
Sep 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package com.microsoft.hyperspace.actions
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{input_file_name, udf}
import org.apache.spark.sql.sources.DataSourceRegister

import com.microsoft.hyperspace.HyperspaceException
Expand Down Expand Up @@ -181,8 +182,24 @@ private[actions] abstract class CreateActionBase(dataManager: IndexDataManager)
val missingPartitionColumns = getPartitionColumns(df).filter(
ResolverUtils.resolve(spark, _, columnsFromIndexConfig).isEmpty)
val allIndexColumns = columnsFromIndexConfig ++ missingPartitionColumns

// File path value in DATA_FILE_NAME_COLUMN column is stored as String. We normalize
// path values in this column by removing extra preceding `/` characters in the path
// and store it the same way paths are stored in Content in an IndexLogEntry instance.
// Normalizing path values in DATA_FILE_NAME_COLUMN column keeps path representation
// unified in Hyperspace (between index lineage and index metadata) and helps performance
// by avoiding the need to fix paths (i.e. removing extra `/` characters) each time
// value of DATA_FILE_NAME_COLUMN column is read.
Comment thread
rapoth marked this conversation as resolved.
// Here is an example of normalization:
// - Original raw path (output of input_file_name() udf, before normalization):
// + file:///C:/myGit/hyperspace-1/src/test/part-00003.snappy.parquet
// - Normalized path to be stored in DATA_FILE_NAME_COLUMN column:
// + file:/C:/myGit/hyperspace-1/src/test/part-00003.snappy.parquet
val absolutePath: UserDefinedFunction = udf(
(filePath: String) => PathUtils.makeAbsolute(filePath).toString)

df.select(allIndexColumns.head, allIndexColumns.tail: _*)
.withColumn(IndexConstants.DATA_FILE_NAME_COLUMN, input_file_name())
.withColumn(IndexConstants.DATA_FILE_NAME_COLUMN, absolutePath(input_file_name()))
} else {
df.select(columnsFromIndexConfig.head, columnsFromIndexConfig.tail: _*)
}
Expand Down
59 changes: 11 additions & 48 deletions src/main/scala/com/microsoft/hyperspace/actions/RefreshAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,24 @@
package com.microsoft.hyperspace.actions

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING}
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshActionEvent}

// TODO: This class depends directly on LogEntry. This should be updated such that
// it works with IndexLogEntry only. (for example, this class can take in
// derivedDataset specific logic for refreshing).
/**
* The Index refresh action is used to perform a full rebuild of the index.
* Consequently, it ends up creating a new version of the index and involves
* a full scan of the underlying source data.
*
* @param spark SparkSession.
* @param logManager Index LogManager for index being refreshed.
* @param dataManager Index DataManager for index being refreshed.
*/
class RefreshAction(
Comment thread
rapoth marked this conversation as resolved.
spark: SparkSession,
final override protected val logManager: IndexLogManager,
logManager: IndexLogManager,
dataManager: IndexDataManager)
extends CreateActionBase(dataManager)
with Action {
private lazy val previousLogEntry: LogEntry = {
logManager.getLog(baseId).getOrElse {
throw HyperspaceException("LogEntry must exist for refresh operation")
}
}

private lazy val previousIndexLogEntry = previousLogEntry.asInstanceOf[IndexLogEntry]

// Reconstruct a df from schema
private lazy val df = {
val rels = previousIndexLogEntry.relations
// Currently we only support to create an index on a LogicalRelation.
assert(rels.size == 1)
val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType]
spark.read
.schema(dataSchema)
.format(rels.head.fileFormat)
.options(rels.head.options)
.load(rels.head.rootPaths: _*)
}

private lazy val indexConfig: IndexConfig = {
val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns
IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included)
}

final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath)

final override val transientState: String = REFRESHING

final override val finalState: String = ACTIVE

final override def validate(): Unit = {
if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) {
throw HyperspaceException(
s"Refresh is only supported in $ACTIVE state. " +
s"Current index state is ${previousIndexLogEntry.state}")
}
}
extends RefreshActionBase(spark, logManager, dataManager) {

final override def op(): Unit = {
// TODO: The current implementation picks the number of buckets from session config.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.actions

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, REFRESHING}
import com.microsoft.hyperspace.index._

/**
* Base abstract class containing common code for different types of index refresh actions.
*
* @param spark SparkSession
* @param logManager Index LogManager for index being refreshed
* @param dataManager Index DataManager for index being refreshed
*/
// TODO: This class depends directly on LogEntry. This should be updated such that
// it works with IndexLogEntry only. (for example, this class can take in
// derivedDataset specific logic for refreshing).
Comment thread
pirz marked this conversation as resolved.
private[actions] abstract class RefreshActionBase(
spark: SparkSession,
final override protected val logManager: IndexLogManager,
dataManager: IndexDataManager)
extends CreateActionBase(dataManager)
with Action {
private lazy val previousLogEntry: LogEntry = {
logManager.getLog(baseId).getOrElse {
throw HyperspaceException("LogEntry must exist for refresh operation")
}
}

protected lazy val previousIndexLogEntry = previousLogEntry.asInstanceOf[IndexLogEntry]

// Reconstruct a df from schema
protected lazy val df = {
Comment thread
pirz marked this conversation as resolved.
val rels = previousIndexLogEntry.relations
val dataSchema = DataType.fromJson(rels.head.dataSchemaJson).asInstanceOf[StructType]
spark.read
.schema(dataSchema)
.format(rels.head.fileFormat)
.options(rels.head.options)
.load(rels.head.rootPaths: _*)
}

protected lazy val indexConfig: IndexConfig = {
val ddColumns = previousIndexLogEntry.derivedDataset.properties.columns
IndexConfig(previousIndexLogEntry.name, ddColumns.indexed, ddColumns.included)
}

final override def logEntry: LogEntry = getIndexLogEntry(spark, df, indexConfig, indexDataPath)

final override val transientState: String = REFRESHING

final override val finalState: String = ACTIVE

override def validate(): Unit = {
if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) {
throw HyperspaceException(
s"Refresh is only supported in $ACTIVE state. " +
s"Current index state is ${previousIndexLogEntry.state}")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.hyperspace.actions

import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.index._
import com.microsoft.hyperspace.index.DataFrameWriterExtensions.Bucketizer
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, RefreshDeleteActionEvent}

Comment thread
pirz marked this conversation as resolved.
/**
* Refresh index by removing index entries from any deleted source data file.
* Note this Refresh Action only fixes an index w.r.t deleted source data files
* and does not consider new source data files (if any).
* If some original source data file(s) are removed between previous version of index and now,
* this Refresh Action updates the index as follows:
* 1. Deleted source data files are identified;
* 2. Index records' lineage is leveraged to remove any index entry coming
* from those deleted source data files.
*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a note that this doesn't handle any "new" source data? (and update PR description as well?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated both.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand this right, this is the plan, right?

  1. This PR handles the case where some files got removed from the underlying source
  2. @apoorvedave1 PR will handle the case where some files got added into the underlying source
  3. Merge incremental mode append and delete actions into index refresh  #149 will take care of merging the logic

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you also link to the issue that will handle this addition of "new" source data? Once we address #149, we can get rid of that line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rapoth - Issue #149 is already mentioned in the first line of PR description. Just to be clear, do you want apoordave1 PR to be mentioned in this PR's description as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all I meant is some clarification saying something like:

Note that this PR only handles the case of ... The case of dealing with ... is handled in another PR

* @param spark SparkSession
* @param logManager Index LogManager for index being refreshed
* @param dataManager Index DataManager for index being refreshed
*/
class RefreshDeleteAction(
Comment thread
imback82 marked this conversation as resolved.
spark: SparkSession,
logManager: IndexLogManager,
dataManager: IndexDataManager)
extends RefreshActionBase(spark, logManager, dataManager)
with Logging {

final override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
RefreshDeleteActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}

/**
* Validate index has lineage column and it is in active state for refreshing and
* there are some deleted source data file(s).
*/
final override def validate(): Unit = {
super.validate()
if (!previousIndexLogEntry.hasLineageColumn(spark)) {
Comment thread
imback82 marked this conversation as resolved.
throw HyperspaceException(
"Index refresh (to handle deleted source data) is " +
"only supported on an index with lineage.")
}

if (deletedFiles.isEmpty) {
throw HyperspaceException("Refresh aborted as no deleted source data file found.")
Comment thread
imback82 marked this conversation as resolved.
}
}

/**
* For an index with lineage, find all the source data files which have been deleted,
* and use index records' lineage to mark and remove index entries which belong to
* deleted source data files as those entries are no longer valid.
*/
final override def op(): Unit = {
Comment thread
rapoth marked this conversation as resolved.
logInfo(
"Refresh index is updating index by removing index entries " +
s"corresponding to ${deletedFiles.length} deleted source data files.")

val refreshDF =
spark.read
.parquet(previousIndexLogEntry.content.files.map(_.toString): _*)
.filter(!col(s"${IndexConstants.DATA_FILE_NAME_COLUMN}").isin(deletedFiles: _*))

refreshDF.write.saveWithBuckets(
refreshDF,
indexDataPath.toString,
previousIndexLogEntry.numBuckets,
indexConfig.indexedColumns)
}

/**
* Compare list of source data files from previous IndexLogEntry to list
* of current source data files, validate fileInfo for existing files and
* identify deleted source data files.
*/
private lazy val deletedFiles: Seq[String] = {
val rels = previousIndexLogEntry.relations
val originalFiles = rels.head.data.properties.content.fileInfos
val currentFiles = rels.head.rootPaths
.flatMap { p =>
Content
.fromDirectory(path = new Path(p), throwIfNotExists = true)
.fileInfos
}
.map(f => f.name -> f)
.toMap

var delFiles = Seq[String]()
originalFiles.foreach { f =>
currentFiles.get(f.name) match {
case Some(v) =>
if (!f.equals(v)) {
throw HyperspaceException(
"Index refresh (to handle deleted source data) aborted. " +
s"Existing source data file info is changed (file: ${f.name}).")
}
case None => delFiles :+= f.name
}
}

delFiles
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.internal.SQLConf

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions._
import com.microsoft.hyperspace.util.HyperspaceConf

class IndexCollectionManager(
spark: SparkSession,
Expand Down Expand Up @@ -66,7 +67,11 @@ class IndexCollectionManager(
withLogManager(indexName) { logManager =>
val indexPath = PathResolver(spark.sessionState.conf).getIndexPath(indexName)
val dataManager = indexDataManagerFactory.create(indexPath)
new RefreshAction(spark, logManager, dataManager).run()
if (HyperspaceConf.refreshDeleteEnabled(spark)) {
new RefreshDeleteAction(spark, logManager, dataManager).run()
} else {
new RefreshAction(spark, logManager, dataManager).run()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ object IndexConstants {
private[hyperspace] val DATA_FILE_NAME_COLUMN = "_data_file_name"
val INDEX_LINEAGE_ENABLED = "spark.hyperspace.index.lineage.enabled"
val INDEX_LINEAGE_ENABLED_DEFAULT = "false"

val REFRESH_DELETE_ENABLED = "spark.hyperspace.index.refresh.delete.enabled"
val REFRESH_DELETE_ENABLED_DEFAULT = "false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import scala.collection.mutable.{HashMap, ListBuffer}
import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DataType, StructType}

import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions.Constants
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.{PathUtils, ResolverUtils}

// IndexLogEntry-specific fingerprint to be temporarily used where fingerprint is not defined.
case class NoOpFingerprint() {
Expand Down Expand Up @@ -402,6 +403,12 @@ case class IndexLogEntry(
sourcePlanSignatures.head
}

def hasLineageColumn(spark: SparkSession): Boolean = {
ResolverUtils
.resolve(spark, IndexConstants.DATA_FILE_NAME_COLUMN, schema.fieldNames)
.isDefined
}

override def hashCode(): Int = {
config.hashCode + signature.hashCode + numBuckets.hashCode + content.hashCode
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,21 @@ case class RefreshActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: S
case class CancelActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent

/**
* Index Refresh Event for deleted source files. Emitted when refresh is called on an index
* with config flag set to remove index entries for deleted source data files.
*
* @param appInfo AppInfo for spark application.
* @param index Related index.
* @param message Message about event.
*/
case class RefreshDeleteActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent

/**
* Index usage event. This event is emitted when an index is picked instead of original data
* source by one of the hyperspace rules.
*
* @param appInfo AppInfo for spark application.
* @param indexes List of selected indexes for this plan.
* @param planBeforeRule Original plan before application of indexes.
Expand Down
Loading