@@ -21,13 +21,12 @@ import com.datadog.android.core.internal.persistence.file.listFilesSafe
2121import com.datadog.android.core.internal.persistence.file.mkdirsSafe
2222import com.datadog.android.internal.time.TimeProvider
2323import java.io.File
24- import java.io.FileFilter
2524import java.util.Locale
25+ import java.util.concurrent.atomic.AtomicBoolean
2626import java.util.concurrent.atomic.AtomicInteger
27+ import java.util.concurrent.atomic.AtomicLong
2728import kotlin.math.roundToLong
2829
29- // TODO RUM-438 Improve this class: need to make it thread-safe and optimize work with file
30- // system in order to reduce the number of syscalls (which are expensive) for files already seen
3130@Suppress(" TooManyFunctions" )
3231internal class BatchFileOrchestrator (
3332 private val rootDir : File ,
@@ -38,8 +37,6 @@ internal class BatchFileOrchestrator(
3837 private val pendingFiles : AtomicInteger = AtomicInteger (0)
3938) : FileOrchestrator {
4039
41- private val fileFilter = BatchFileFilter ()
42-
4340 // Offset the recent threshold for read and write to avoid conflicts
4441 // Arbitrary offset as ±5% of the threshold
4542 @Suppress(" UnsafeThirdPartyFunctionCall" ) // rounded Double isn't NaN
@@ -48,11 +45,11 @@ internal class BatchFileOrchestrator(
4845 @Suppress(" UnsafeThirdPartyFunctionCall" ) // rounded Double isn't NaN
4946 private val recentWriteDelayMs = (config.recentDelayMs * DECREASE_PERCENT ).roundToLong()
5047
51- // keep track of how many items were written in the last known file
52- private var previousFile : File ? = null
53- private var previousFileItemCount : Long = 0
54- private var lastFileAccessTimestamp : Long = 0L
55- private var lastCleanupTimestamp : Long = 0L
48+ private var currentBatchState = CurrentBatchState ( null , 0L , 0L )
49+ private val lastCleanupTimestamp = AtomicLong ( 0L )
50+ private val areKnownFilesInitialized = AtomicBoolean ( false )
51+
52+ private val knownFiles : MutableSet < File > = mutableSetOf ()
5653
5754 // region FileOrchestrator
5855
@@ -66,7 +63,7 @@ internal class BatchFileOrchestrator(
6663 var files = listBatchFiles()
6764 files = deleteObsoleteFiles(files)
6865 freeSpaceIfNeeded(files)
69- lastCleanupTimestamp = timeProvider.getDeviceTimestampMillis()
66+ lastCleanupTimestamp.set( timeProvider.getDeviceTimestampMillis() )
7067 }
7168
7269 return getReusableWritableFile() ? : createNewFile()
@@ -78,14 +75,19 @@ internal class BatchFileOrchestrator(
7875 return null
7976 }
8077
81- val files = listSortedBatchFiles().let {
82- deleteObsoleteFiles(it)
83- }
84- lastCleanupTimestamp = timeProvider.getDeviceTimestampMillis()
78+ val files = deleteObsoleteFiles(listSortedBatchFiles())
79+ lastCleanupTimestamp.set(timeProvider.getDeviceTimestampMillis())
8580 pendingFiles.set(files.count())
8681
8782 return files.firstOrNull {
88- (it !in excludeFiles) && ! isFileRecent(it, recentReadDelayMs)
83+ when {
84+ it in excludeFiles || isFileRecent(it, recentReadDelayMs) -> false
85+ ! it.existsSafe(internalLogger) -> {
86+ onFileDeleted(it)
87+ false
88+ }
89+ else -> true
90+ }
8991 }
9092 }
9193
@@ -138,112 +140,127 @@ internal class BatchFileOrchestrator(
138140 }
139141 }
140142
141- // endregion
142-
143- // region Internal
144-
145143 override fun decrementAndGetPendingFilesCount (): Int {
146144 return pendingFiles.decrementAndGet()
147145 }
148146
149- @Suppress(" LiftReturnOrAssignment" , " ReturnCount" )
147+ override fun onFileDeleted (file : File ) {
148+ synchronized(knownFiles) {
149+ knownFiles.remove(file)
150+ }
151+ }
152+
153+ @WorkerThread
154+ override fun refreshFilesFromDisk () {
155+ val files = rootDir.listFilesSafe(internalLogger)?.filter { it.isBatchFile }
156+ if (files != null ) {
157+ synchronized(knownFiles) {
158+ knownFiles.clear()
159+ knownFiles.addAll(files)
160+ }
161+ }
162+ }
163+
164+ // endregion
165+
166+ // region Private
167+
168+ @WorkerThread
169+ @Suppress(" ReturnCount" )
150170 private fun isRootDirValid (): Boolean {
151- if (rootDir.existsSafe(internalLogger)) {
152- if (rootDir.isDirectory) {
153- if (rootDir.canWriteSafe(internalLogger)) {
154- return true
155- } else {
171+ val isValid = if (rootDir.existsSafe(internalLogger)) {
172+ when {
173+ ! rootDir.isDirectory -> {
156174 internalLogger.log(
157175 InternalLogger .Level .ERROR ,
158- listOf (
159- InternalLogger .Target .MAINTAINER ,
160- InternalLogger .Target .TELEMETRY
161- ),
162- { ERROR_ROOT_NOT_WRITABLE .format(Locale .US , rootDir.path) }
176+ listOf (InternalLogger .Target .MAINTAINER , InternalLogger .Target .TELEMETRY ),
177+ { ERROR_ROOT_NOT_DIR .format(Locale .US , rootDir.path) }
163178 )
164- return false
165- }
166- } else {
167- internalLogger.log(
168- InternalLogger .Level .ERROR ,
169- listOf (
170- InternalLogger .Target .MAINTAINER ,
171- InternalLogger .Target .TELEMETRY
172- ),
173- { ERROR_ROOT_NOT_DIR .format(Locale .US , rootDir.path) }
174- )
175- return false
176- }
177- } else {
178- synchronized(rootDir) {
179- // double check if directory was already created by some other thread
180- // entered this branch
181- if (rootDir.existsSafe(internalLogger)) {
182- return true
179+ false
183180 }
184181
185- if (rootDir.mkdirsSafe(internalLogger)) {
186- return true
187- } else {
182+ ! rootDir.canWriteSafe(internalLogger) -> {
188183 internalLogger.log(
189184 InternalLogger .Level .ERROR ,
190- listOf (
191- InternalLogger .Target .MAINTAINER ,
192- InternalLogger .Target .TELEMETRY
193- ),
194- { ERROR_CANT_CREATE_ROOT .format(Locale .US , rootDir.path) }
185+ listOf (InternalLogger .Target .MAINTAINER , InternalLogger .Target .TELEMETRY ),
186+ { ERROR_ROOT_NOT_WRITABLE .format(Locale .US , rootDir.path) }
195187 )
196- return false
188+ false
197189 }
190+
191+ else -> true
198192 }
193+ } else {
194+ createRootDirectory()
195+ }
196+
197+ if (isValid && areKnownFilesInitialized.compareAndSet(false , true )) refreshFilesFromDisk()
198+ return isValid
199+ }
200+
201+ private fun createRootDirectory (): Boolean = synchronized(rootDir) {
202+ val created = rootDir.existsSafe(internalLogger) || rootDir.mkdirsSafe(internalLogger)
203+ if (! created) {
204+ internalLogger.log(
205+ InternalLogger .Level .ERROR ,
206+ listOf (InternalLogger .Target .MAINTAINER , InternalLogger .Target .TELEMETRY ),
207+ { ERROR_CANT_CREATE_ROOT .format(Locale .US , rootDir.path) }
208+ )
199209 }
210+ created
200211 }
201212
202213 private fun createNewFile (): File {
203- val newFileName = timeProvider.getDeviceTimestampMillis().toString()
204- val newFile = File (rootDir, newFileName)
205- val closedFile = previousFile
206- val closedFileLastAccessTimestamp = lastFileAccessTimestamp
207- if (closedFile != null ) {
214+ val state = currentBatchState
215+ if (state.file != null ) {
208216 metricsDispatcher.sendBatchClosedMetric(
209- closedFile ,
217+ state.file ,
210218 BatchClosedMetadata (
211- lastTimeWasUsedInMs = closedFileLastAccessTimestamp ,
212- eventsCount = previousFileItemCount
219+ lastTimeWasUsedInMs = state.lastAccessTimestamp ,
220+ eventsCount = state.itemCount
213221 )
214222 )
215223 }
216- previousFile = newFile
217- previousFileItemCount = 1
218- lastFileAccessTimestamp = timeProvider.getDeviceTimestampMillis()
224+
219225 pendingFiles.incrementAndGet()
226+
227+ val newFile = File (rootDir, timeProvider.getDeviceTimestampMillis().toString())
228+ currentBatchState = CurrentBatchState (newFile, 1L , timeProvider.getDeviceTimestampMillis())
229+ synchronized(knownFiles) {
230+ knownFiles.add(newFile)
231+ }
220232 return newFile
221233 }
222234
223235 @Suppress(" ReturnCount" )
224236 private fun getReusableWritableFile (): File ? {
225- val files = listBatchFiles()
226- val lastFile = files.latestBatchFile ? : return null
237+ val latestFile = synchronized(knownFiles) { knownFiles.maxOrNull() } ? : return null
227238
228- val lastKnownFile = previousFile
229- val lastKnownFileItemCount = previousFileItemCount
230- if (lastKnownFile != lastFile) {
239+ if (! latestFile.existsSafe(internalLogger)) {
240+ onFileDeleted(latestFile)
241+ return null
242+ }
243+
244+ if (currentBatchState.file != latestFile) {
231245 // this situation can happen because:
232- // 1. `lastFile ` is a file written during a previous session
233- // 2. `lastFile ` was created by another system/process
234- // 3. `lastKnownFile ` was deleted
246+ // 1. `latestFile ` is a file written during a previous session
247+ // 2. `latestFile ` was created by another system/process
248+ // 3. `previousFile ` was deleted
235249 // In any case, we don't know the item count, so to be safe, we create a new file
236250 return null
237251 }
238252
239- val isRecentEnough = isFileRecent(lastFile, recentWriteDelayMs)
240- val hasRoomForMore = lastFile.lengthSafe(internalLogger) < config.maxBatchSize
253+ val isRecentEnough = isFileRecent(latestFile, recentWriteDelayMs)
254+ val hasRoomForMore = latestFile.lengthSafe(internalLogger) < config.maxBatchSize
255+ val lastKnownFileItemCount = currentBatchState.itemCount
241256 val hasSlotForMore = (lastKnownFileItemCount < config.maxItemsPerBatch)
242257
243258 return if (isRecentEnough && hasRoomForMore && hasSlotForMore) {
244- previousFileItemCount = lastKnownFileItemCount + 1
245- lastFileAccessTimestamp = timeProvider.getDeviceTimestampMillis()
246- lastFile
259+ currentBatchState = currentBatchState.copy(
260+ itemCount = lastKnownFileItemCount + 1 ,
261+ lastAccessTimestamp = timeProvider.getDeviceTimestampMillis()
262+ )
263+ latestFile
247264 } else {
248265 null
249266 }
@@ -262,6 +279,7 @@ internal class BatchFileOrchestrator(
262279 val isOldFile = (it.name.toLongOrNull() ? : 0 ) < threshold
263280 if (isOldFile) {
264281 if (it.deleteSafe(internalLogger)) {
282+ onFileDeleted(it)
265283 metricsDispatcher.sendBatchDeletedMetric(
266284 batchFile = it,
267285 removalReason = RemovalReason .Obsolete ,
@@ -306,6 +324,10 @@ internal class BatchFileOrchestrator(
306324 val size = file.lengthSafe(internalLogger)
307325 val wasDeleted = file.deleteSafe(internalLogger)
308326 return if (wasDeleted) {
327+ onFileDeleted(file)
328+ if (currentBatchState.file == file) {
329+ currentBatchState = CurrentBatchState (null , 0L , currentBatchState.lastAccessTimestamp)
330+ }
309331 if (sendMetric) {
310332 metricsDispatcher.sendBatchDeletedMetric(file, RemovalReason .Purged , pendingFiles.decrementAndGet())
311333 }
@@ -316,7 +338,9 @@ internal class BatchFileOrchestrator(
316338 }
317339
318340 private fun listBatchFiles (): List <File > {
319- return rootDir.listFilesSafe(fileFilter, internalLogger).orEmpty().toList()
341+ return synchronized(knownFiles) {
342+ knownFiles.toList()
343+ }
320344 }
321345
322346 private fun listSortedBatchFiles (): List <File > {
@@ -326,7 +350,7 @@ internal class BatchFileOrchestrator(
326350 }
327351
328352 private fun canDoCleanup (): Boolean {
329- return timeProvider.getDeviceTimestampMillis() - lastCleanupTimestamp > config.cleanupFrequencyThreshold
353+ return timeProvider.getDeviceTimestampMillis() - lastCleanupTimestamp.get() > config.cleanupFrequencyThreshold
330354 }
331355
332356 private val File .metadata: File
@@ -335,23 +359,13 @@ internal class BatchFileOrchestrator(
335359 private val File .isBatchFile: Boolean
336360 get() = name.toLongOrNull() != null
337361
338- private val List <File >.latestBatchFile: File ?
339- get() = maxOrNull()
340-
341362 // endregion
342363
343- // region FileFilter
344-
345- internal inner class BatchFileFilter : FileFilter {
346- @Suppress(" ReturnCount" )
347- override fun accept (file : File ? ): Boolean {
348- if (file == null ) return false
349-
350- return file.isBatchFile
351- }
352- }
353-
354- // endregion
364+ private data class CurrentBatchState (
365+ val file : File ? ,
366+ val itemCount : Long ,
367+ val lastAccessTimestamp : Long
368+ )
355369
356370 companion object {
357371
0 commit comments