From 8d87a3f28800a605daaec362095308358c4fa640 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 Mar 2019 16:01:37 +0800 Subject: [PATCH] [SPARK-27301][Dstream]Shorten the FileSytem cached life cycle to the cleanup method inner scope --- .../apache/spark/streaming/dstream/DStreamCheckpointData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index e73837eb9602f..ebfaa83c704bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -39,7 +39,6 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) // in that batch's checkpoint data @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] - @transient private var fileSystem: FileSystem = null protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] /** @@ -80,6 +79,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) // even after master fails, as the checkpoint data of `time` does not refer to those files val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime) logDebug("Files to delete:\n" + filesToDelete.mkString(",")) + var fileSystem: FileSystem = null filesToDelete.foreach { case (time, file) => try {