From c78640a1ec413be295cddf9efd780533ab8eea70 Mon Sep 17 00:00:00 2001 From: Sergei Bakhtiarov Date: Tue, 16 Jun 2026 11:38:08 +0200 Subject: [PATCH 1/3] feat: foreground service for sending pending messages (WPB-26422) --- app/proguard-rules.pro | 1 - app/src/main/AndroidManifest.xml | 6 + app/src/main/baseline-prof.txt | 3 +- .../wire/android/GlobalObserversManager.kt | 38 +++ .../android/di/metro/WireApplicationGraph.kt | 2 + .../DynamicReceiversManager.kt | 44 ++- .../PendingMessagesScheduledReceiver.kt | 67 ++++ .../PendingMessagesForegroundService.kt | 294 ++++++++++++++++++ .../wire/android/services/ServicesManager.kt | 13 + .../util/lifecycle/SyncLifecycleManager.kt | 43 ++- app/src/main/res/values/strings.xml | 5 + app/src/main/startup-prof.txt | 3 +- .../android/GlobalObserversManagerTest.kt | 130 +++++++- .../framework/fake/FakeSyncExecutor.kt | 4 + .../PendingMessagesScheduledReceiverTest.kt | 101 ++++++ ...endingMessagesForegroundSyncHandlerTest.kt | 104 +++++++ ...gMessagesAfterForegroundSyncUseCaseTest.kt | 116 +++++++ .../lifecycle/SyncLifecycleManagerTest.kt | 123 ++++++++ .../notification/NotificationConstants.kt | 4 + kalium | 2 +- 20 files changed, 1072 insertions(+), 31 deletions(-) create mode 100644 app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt create mode 100644 app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt create mode 100644 app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt create mode 100644 app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt create mode 100644 app/src/test/kotlin/com/wire/android/services/SendPendingMessagesAfterForegroundSyncUseCaseTest.kt diff --git a/app/proguard-rules.pro b/app/proguard-rules.pro index d897c790cff..b11ee42fca8 100644 --- a/app/proguard-rules.pro +++ b/app/proguard-rules.pro @@ -71,7 +71,6 @@ # WrapperWorkerFactory resolves inner workers by class name from input data. # Keep names stable so existing enqueued work remains resolvable after minification. # See docs/minification-workmanager-compat.md --keepnames class com.wire.kalium.logic.sync.PendingMessagesSenderWorker -keepnames class com.wire.kalium.logic.sync.periodic.UserConfigSyncWorker -keepnames class com.wire.kalium.logic.sync.periodic.UpdateApiVersionsWorker -keepnames class com.wire.kalium.logic.sync.receiver.asset.AudioNormalizedLoudnessWorker diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml index ad9c467008e..d53999f040f 100644 --- a/app/src/main/AndroidManifest.xml +++ b/app/src/main/AndroidManifest.xml @@ -39,6 +39,7 @@ + @@ -349,6 +350,11 @@ android:exported="false" android:foregroundServiceType="specialUse" /> + + (Landroid/content/Context;Lcom/wire/kalium/logic/GlobalKaliumScope;)V SPLcom/wire/kalium/logic/sync/GlobalWorkSchedulerImpl;->schedulePeriodicApiVersionUpdate()V -Lcom/wire/kalium/logic/sync/PendingMessagesSenderWorker; Lcom/wire/kalium/logic/sync/WorkSchedulerImplKt; SPLcom/wire/kalium/logic/sync/WorkSchedulerImplKt;->()V SPLcom/wire/kalium/logic/sync/WorkSchedulerImplKt;->buildConnectedPeriodicWorkRequest$default(Lkotlin/reflect/KClass;Lcom/wire/kalium/logic/data/id/QualifiedID;ZILjava/lang/Object;)Landroidx/work/PeriodicWorkRequest; @@ -40417,4 +40416,4 @@ SPLorg/slf4j/helpers/Util;->()V SPLorg/slf4j/helpers/Util;->safeGetBooleanSystemProperty(Ljava/lang/String;)Z SPLorg/slf4j/helpers/Util;->safeGetSystemProperty(Ljava/lang/String;)Ljava/lang/String; Lorg/slf4j/spi/MDCAdapter; -Lorg/slf4j/spi/SLF4JServiceProvider; \ No newline at end of file +Lorg/slf4j/spi/SLF4JServiceProvider; diff --git a/app/src/main/kotlin/com/wire/android/GlobalObserversManager.kt b/app/src/main/kotlin/com/wire/android/GlobalObserversManager.kt index 0dfa1919d78..c095759aed7 100644 --- a/app/src/main/kotlin/com/wire/android/GlobalObserversManager.kt +++ b/app/src/main/kotlin/com/wire/android/GlobalObserversManager.kt @@ -22,6 +22,7 @@ import com.wire.android.datastore.UserDataStoreProvider import com.wire.android.di.KaliumCoreLogic import com.wire.android.notification.NotificationChannelsManager import com.wire.android.notification.WireNotificationManager +import com.wire.android.services.SendPendingMessagesAfterForegroundSyncUseCase import com.wire.android.util.CurrentScreenManager import com.wire.android.util.dispatchers.DispatcherProvider import com.wire.kalium.logic.CoreLogic @@ -30,6 +31,8 @@ import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.auth.LogoutCallback import com.wire.kalium.logic.feature.session.CurrentSessionResult import com.wire.kalium.logic.feature.user.webSocketStatus.ObservePersistentWebSocketConnectionStatusUseCase +import com.wire.kalium.network.NetworkState +import com.wire.kalium.network.NetworkStateObserver import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.awaitClose @@ -61,6 +64,8 @@ class GlobalObserversManager @Inject constructor( private val notificationChannelsManager: NotificationChannelsManager, private val userDataStoreProvider: UserDataStoreProvider, private val currentScreenManager: CurrentScreenManager, + private val sendPendingMessagesAfterForegroundSync: SendPendingMessagesAfterForegroundSyncUseCase, + private val networkStateObserver: NetworkStateObserver, ) { // TODO(tests): refactor so scope/dispatcher can be injected and properly stopped private val scope = CoroutineScope(SupervisorJob() + dispatcherProvider.io()) @@ -78,6 +83,7 @@ class GlobalObserversManager @Inject constructor( } scope.handleLogouts() scope.handleDeleteEphemeralMessageEndDate() + scope.retryPendingMessagesOnAppOpen() } private suspend fun setUpNotifications() { @@ -162,4 +168,36 @@ class GlobalObserversManager @Inject constructor( .collect { userId -> coreLogic.getSessionScope(userId).messages.deleteEphemeralMessageEndDate() } } } + + private fun CoroutineScope.retryPendingMessagesOnAppOpen() { + launch { + currentScreenManager.isAppVisibleFlow() + .filter { isAppVisible -> isAppVisible } + .collectLatest { + val networkState = networkStateObserver.observeNetworkState().value + if (networkState !is NetworkState.ConnectedWithInternet) { + appLogger.i("$TAG: no internet connection, skipping pending messages retry on app open") + return@collectLatest + } + + when (val result = coreLogic.getGlobalScope().session.currentSession()) { + is CurrentSessionResult.Success -> { + val accountInfo = result.accountInfo + if (accountInfo.isValid()) { + sendPendingMessagesAfterForegroundSync(accountInfo.userId) + } else { + appLogger.w("$TAG: current session is invalid, skipping pending messages retry on app open") + } + } + + is CurrentSessionResult.Failure -> + appLogger.w("$TAG: unable to get current valid session, skipping pending messages retry on app open: $result") + } + } + } + } + + private companion object { + private const val TAG = "GlobalObserversManager" + } } diff --git a/app/src/main/kotlin/com/wire/android/di/metro/WireApplicationGraph.kt b/app/src/main/kotlin/com/wire/android/di/metro/WireApplicationGraph.kt index f1817f5d721..34ac651b006 100644 --- a/app/src/main/kotlin/com/wire/android/di/metro/WireApplicationGraph.kt +++ b/app/src/main/kotlin/com/wire/android/di/metro/WireApplicationGraph.kt @@ -54,6 +54,7 @@ import com.wire.android.notification.broadcastreceivers.PlayPauseAudioMessageRec import com.wire.android.notification.broadcastreceivers.StopAudioMessageReceiver import com.wire.android.search.SearchMetroViewModelBindings import com.wire.android.services.CallService +import com.wire.android.services.PendingMessagesForegroundService import com.wire.android.services.PersistentWebSocketService import com.wire.android.services.PlayingAudioMessageService import com.wire.android.ui.AppLockActivity @@ -125,6 +126,7 @@ interface WireApplicationGraph : ViewModelGraph { fun inject(activity: CallActivity) fun inject(activity: OngoingCallActivity) fun inject(service: PersistentWebSocketService) + fun inject(service: PendingMessagesForegroundService) fun inject(service: CallService) fun inject(service: PlayingAudioMessageService) fun inject(receiver: StartServiceReceiver) diff --git a/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/DynamicReceiversManager.kt b/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/DynamicReceiversManager.kt index 7c162f518ae..63801d1c638 100644 --- a/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/DynamicReceiversManager.kt +++ b/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/DynamicReceiversManager.kt @@ -20,10 +20,12 @@ package com.wire.android.notification.broadcastreceivers import android.content.Context import android.content.Intent import android.content.IntentFilter +import androidx.core.content.ContextCompat import com.wire.android.BuildConfig.EMM_SUPPORT_ENABLED import com.wire.android.appLogger import com.wire.android.emm.ManagedConfigurationsReceiver import com.wire.android.di.ApplicationContext +import com.wire.kalium.logic.sync.PendingMessagesForegroundSync import dev.zacsweers.metro.Inject import dev.zacsweers.metro.AppScope import dev.zacsweers.metro.SingleIn @@ -35,35 +37,51 @@ import dev.zacsweers.metro.SingleIn @SingleIn(AppScope::class) class DynamicReceiversManager @Inject constructor( @ApplicationContext val context: Context, - private val managedConfigurationsReceiver: ManagedConfigurationsReceiver + private val managedConfigurationsReceiver: ManagedConfigurationsReceiver, + private val pendingMessagesScheduledReceiver: PendingMessagesScheduledReceiver, ) { @Volatile private var isRegistered = false fun registerAll() { - if (EMM_SUPPORT_ENABLED) { - synchronized(this) { - if (!isRegistered) { + synchronized(this) { + if (!isRegistered) { + if (EMM_SUPPORT_ENABLED) { appLogger.i("$TAG Registering Runtime ManagedConfigurations Broadcast receiver") context.registerReceiver(managedConfigurationsReceiver, IntentFilter(Intent.ACTION_APPLICATION_RESTRICTIONS_CHANGED)) - isRegistered = true - } else { - appLogger.w("$TAG Receiver already registered, skipping") } + + appLogger.i("$TAG Registering PendingMessagesScheduledReceiver") + val pendingMessagesIntentFilter = IntentFilter(PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_SCHEDULED) + .apply { addAction(PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_CANCELLED) } + ContextCompat.registerReceiver( + context, + pendingMessagesScheduledReceiver, + pendingMessagesIntentFilter, + ContextCompat.RECEIVER_NOT_EXPORTED + ) + + isRegistered = true + } else { + appLogger.w("$TAG Receiver already registered, skipping") } } } fun unregisterAll() { - if (EMM_SUPPORT_ENABLED) { - synchronized(this) { - if (isRegistered) { + synchronized(this) { + if (isRegistered) { + if (EMM_SUPPORT_ENABLED) { appLogger.i("$TAG Unregistering Runtime ManagedConfigurations Broadcast receiver") context.unregisterReceiver(managedConfigurationsReceiver) - isRegistered = false - } else { - appLogger.w("$TAG Receiver not registered, skipping unregister") } + + appLogger.i("$TAG Unregistering PendingMessagesScheduledReceiver") + context.unregisterReceiver(pendingMessagesScheduledReceiver) + + isRegistered = false + } else { + appLogger.w("$TAG Receiver not registered, skipping unregister") } } } diff --git a/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt b/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt new file mode 100644 index 00000000000..2dc78f4b272 --- /dev/null +++ b/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt @@ -0,0 +1,67 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ + +package com.wire.android.notification.broadcastreceivers + +import android.content.BroadcastReceiver +import android.content.Context +import android.content.Intent +import com.wire.android.appLogger +import com.wire.android.services.ServicesManager +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.sync.PendingMessagesForegroundSync +import dev.zacsweers.metro.AppScope +import dev.zacsweers.metro.Inject +import dev.zacsweers.metro.SingleIn + +@SingleIn(AppScope::class) +class PendingMessagesScheduledReceiver @Inject constructor( + private val servicesManager: ServicesManager, +) : BroadcastReceiver() { + + override fun onReceive(context: Context, intent: Intent) { + when (intent.action) { + PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_SCHEDULED -> { + val userId = intent.userId() + if (userId == null) { + appLogger.w("$TAG: missing user id, skipping pending messages foreground service") + return + } + servicesManager.startPendingMessagesForegroundService(userId) + } + + PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_CANCELLED -> + servicesManager.stopPendingMessagesForegroundService() + + else -> { + appLogger.w("$TAG: unexpected action ${intent.action}") + return + } + } + } + + companion object { + private const val TAG = "PendingMessagesScheduledReceiver" + } +} + +private fun Intent.userId(): UserId? { + val value = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE) + val domain = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN) + return if (value != null && domain != null) UserId(value, domain) else null +} diff --git a/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt b/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt new file mode 100644 index 00000000000..6805515cafd --- /dev/null +++ b/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt @@ -0,0 +1,294 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ + +package com.wire.android.services + +import android.app.ForegroundServiceStartNotAllowedException +import android.app.Notification +import android.app.Service +import android.content.Context +import android.content.Intent +import android.content.pm.ServiceInfo +import android.net.ConnectivityManager +import android.net.Network +import android.net.NetworkCapabilities +import android.net.NetworkRequest +import android.os.Build +import android.os.IBinder +import androidx.core.app.NotificationCompat +import androidx.core.app.ServiceCompat +import com.wire.android.R +import com.wire.android.appLogger +import com.wire.android.di.KaliumCoreLogic +import com.wire.android.di.metro.wireApplicationGraph +import com.wire.android.notification.NotificationChannelsManager +import com.wire.android.notification.NotificationConstants.PENDING_MESSAGES_SYNC_CHANNEL_ID +import com.wire.android.notification.NotificationConstants.PENDING_MESSAGES_SYNC_CHANNEL_NAME +import com.wire.android.notification.NotificationIds +import com.wire.android.notification.openAppPendingIntent +import com.wire.android.util.dispatchers.DispatcherProvider +import com.wire.android.util.lifecycle.SyncLifecycleManager +import com.wire.kalium.logic.CoreLogic +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.feature.session.CurrentSessionResult +import com.wire.kalium.logic.sync.PendingMessagesForegroundSync +import dev.zacsweers.metro.Inject +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeoutOrNull +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.seconds + +class PendingMessagesForegroundService : Service() { + + @Inject + @KaliumCoreLogic + lateinit var coreLogic: CoreLogic + + @Inject + lateinit var dispatcherProvider: DispatcherProvider + + @Inject + lateinit var notificationChannelsManager: NotificationChannelsManager + + @Inject + private lateinit var sendPendingMessagesAfterForegroundSync: SendPendingMessagesAfterForegroundSyncUseCase + + private val scope by lazy { + CoroutineScope(SupervisorJob() + dispatcherProvider.io()) + } + + override fun onBind(intent: Intent?): IBinder? = null + + override fun onCreate() { + wireApplicationGraph.inject(this) + super.onCreate() + isServiceStarted = true + startAsForeground(createNotification(waitingForConnection = true)) + } + + override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { + if (!isServiceStarted) { + isServiceStarted = true + startAsForeground(createNotification(waitingForConnection = true)) + } + + val userId = intent?.userId() + if (userId == null) { + appLogger.w("$TAG: missing user id, skipping pending messages send") + stopSelf(startId) + return START_NOT_STICKY + } + + scope.launch { + run(userId) + stopSelf(startId) + } + + return START_NOT_STICKY + } + + private suspend fun run(userId: UserId) { + val connected = withTimeoutOrNull(MAX_WAIT_FOR_NETWORK_MINUTES.minutes) { + networkAvailability().first { it } + } ?: false + + if (!connected) { + appLogger.i("$TAG: network did not become available before timeout") + return + } + + startAsForeground(createNotification(waitingForConnection = false)) + + PendingMessagesForegroundSyncHandler(coreLogic, sendPendingMessagesAfterForegroundSync).sendPendingMessagesForCurrentSession(userId) + } + + private fun networkAvailability(): Flow = callbackFlow { + val connectivityManager = getSystemService(CONNECTIVITY_SERVICE) as ConnectivityManager + fun emitCurrentAvailability() { + trySend(connectivityManager.hasValidatedInternet()) + } + + val callback = object : ConnectivityManager.NetworkCallback() { + override fun onAvailable(network: Network) = emitCurrentAvailability() + override fun onLost(network: Network) = emitCurrentAvailability() + override fun onCapabilitiesChanged(network: Network, networkCapabilities: NetworkCapabilities) = emitCurrentAvailability() + } + + emitCurrentAvailability() + connectivityManager.registerNetworkCallback( + NetworkRequest.Builder() + .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) + .build(), + callback + ) + awaitClose { connectivityManager.unregisterNetworkCallback(callback) } + } + + private fun ConnectivityManager.hasValidatedInternet(): Boolean = + activeNetwork + ?.let(::getNetworkCapabilities) + ?.let { + it.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) && + it.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED) + } == true + + private fun startAsForeground(notification: Notification) { + notificationChannelsManager.createRegularChannel(PENDING_MESSAGES_SYNC_CHANNEL_ID, PENDING_MESSAGES_SYNC_CHANNEL_NAME) + + if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) { + try { + ServiceCompat.startForeground( + this, + NotificationIds.PENDING_MESSAGES_SYNC_NOTIFICATION_ID.ordinal, + notification, + ServiceInfo.FOREGROUND_SERVICE_TYPE_DATA_SYNC + ) + } catch (e: ForegroundServiceStartNotAllowedException) { + appLogger.e("$TAG: failure while starting foreground: $e") + stopSelf() + } + } else { + ServiceCompat.startForeground( + this, + NotificationIds.PENDING_MESSAGES_SYNC_NOTIFICATION_ID.ordinal, + notification, + ServiceInfo.FOREGROUND_SERVICE_TYPE_DATA_SYNC + ) + } + } + + private fun createNotification(waitingForConnection: Boolean): Notification = + NotificationCompat.Builder(this, PENDING_MESSAGES_SYNC_CHANNEL_ID) + .setContentText( + if (waitingForConnection) { + resources.getString(R.string.pending_messages_notification_waiting) + } else { + resources.getString(R.string.pending_messages_notification_sending) + } + ) + .setSmallIcon(R.drawable.websocket_notification_icon_small) + .setContentIntent(openAppPendingIntent(this)) + .setCategory(NotificationCompat.CATEGORY_SERVICE) + .setForegroundServiceBehavior(NotificationCompat.FOREGROUND_SERVICE_IMMEDIATE) + .setAutoCancel(false) + .setOngoing(true) + .build() + + override fun onTimeout(startId: Int, fgsType: Int) { + appLogger.w("$TAG: foreground service timeout reached") + stopSelf(startId) + } + + override fun onDestroy() { + super.onDestroy() + scope.cancel("PendingMessagesForegroundService was destroyed") + isServiceStarted = false + } + + companion object { + private const val TAG = "PendingMessagesForegroundService" + private const val MAX_WAIT_FOR_NETWORK_MINUTES = 30 + + fun newIntent(context: Context, userId: UserId): Intent = + Intent(context, PendingMessagesForegroundService::class.java) + .putExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE, userId.value) + .putExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN, userId.domain) + + fun stopIntent(context: Context): Intent = + Intent(context, PendingMessagesForegroundService::class.java) + + var isServiceStarted = false + } +} + +internal class PendingMessagesForegroundSyncHandler( + private val currentSession: suspend () -> CurrentSessionResult, + private val sendPendingMessagesAfterForegroundSync: suspend (UserId) -> Unit, +) { + constructor( + coreLogic: CoreLogic, + sendPendingMessagesAfterForegroundSync: SendPendingMessagesAfterForegroundSyncUseCase, + ) : this( + currentSession = { coreLogic.getGlobalScope().session.currentSession() }, + sendPendingMessagesAfterForegroundSync = sendPendingMessagesAfterForegroundSync::invoke + ) + + suspend fun sendPendingMessagesForCurrentSession(scheduledUserId: UserId) { + when (val result = currentSession()) { + is CurrentSessionResult.Success -> { + val accountInfo = result.accountInfo + if (accountInfo.isValid() && accountInfo.userId == scheduledUserId) { + sendPendingMessagesAfterForegroundSync(accountInfo.userId) + } else if (accountInfo.isValid()) { + appLogger.w( + "$TAG: scheduled user ${scheduledUserId.toLogString()} does not match current session " + + "${accountInfo.userId.toLogString()}, skipping pending messages send" + ) + } else { + appLogger.w("$TAG: current session is invalid, skipping pending messages send") + } + } + + is CurrentSessionResult.Failure -> + appLogger.w("$TAG: unable to get current valid session: $result") + } + appLogger.w("$TAG: foreground service finished messages sending") + } + + private companion object { + private const val TAG = "PendingMessagesForegroundService" + } +} + +private fun Intent.userId(): UserId? { + val value = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE) + val domain = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN) + return if (value != null && domain != null) UserId(value, domain) else null +} + +class SendPendingMessagesAfterForegroundSyncUseCase @Inject constructor( + @KaliumCoreLogic private val coreLogic: CoreLogic, + private val syncLifecycleManager: SyncLifecycleManager, +) { + suspend operator fun invoke(userId: UserId) { + syncLifecycleManager.syncTemporarily( + userId = userId, + stayAliveExtraDuration = 3.seconds, + waitForNextSyncState = true, + ) { + val userSessionScope = coreLogic.getSessionScope(userId) + appLogger.i( + "$TAG: sending pending messages for ${userId.toLogString()}, " + + "syncState=${userSessionScope.syncManager.syncState.value}, " + + "userSessionScope=$userSessionScope" + ) + userSessionScope.sendPendingMessages() + } + } + + private companion object { + private const val TAG = "SendPendingMessagesAfterForegroundSyncUseCase" + } +} diff --git a/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt b/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt index a99d6b354d5..3fe1bdb3c2e 100644 --- a/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt +++ b/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt @@ -138,6 +138,19 @@ class ServicesManager @Inject constructor( fun isPersistentWebSocketServiceRunning(): Boolean = PersistentWebSocketService.isServiceStarted + // Pending messages foreground sync + fun startPendingMessagesForegroundService(userId: UserId) { + if (PendingMessagesForegroundService.isServiceStarted) { + appLogger.i("$TAG: PendingMessagesForegroundService already started, not starting again") + } else { + startService(PendingMessagesForegroundService.newIntent(context, userId)) + } + } + + fun stopPendingMessagesForegroundService() { + stopService(PendingMessagesForegroundService.stopIntent(context)) + } + // Playing AudioMessage service fun startPlayingAudioMessageService() { if (PlayingAudioMessageService.isServiceStarted) { diff --git a/app/src/main/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManager.kt b/app/src/main/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManager.kt index 9009877f498..5e8820192ed 100644 --- a/app/src/main/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManager.kt +++ b/app/src/main/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManager.kt @@ -22,12 +22,14 @@ import com.wire.android.appLogger import com.wire.android.di.KaliumCoreLogic import com.wire.android.util.CurrentScreenManager import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC -import com.wire.kalium.logger.obfuscateDomain import com.wire.kalium.logger.obfuscateId import com.wire.kalium.logic.CoreLogic import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.session.GetAllSessionsResult import com.wire.kalium.logic.sync.SyncRequestResult +import dev.zacsweers.metro.AppScope +import dev.zacsweers.metro.Inject +import dev.zacsweers.metro.SingleIn import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay @@ -36,9 +38,6 @@ import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch -import dev.zacsweers.metro.Inject -import dev.zacsweers.metro.AppScope -import dev.zacsweers.metro.SingleIn import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -94,21 +93,43 @@ class SyncLifecycleManager @Inject constructor( * releasing sync. * If there are more ongoing sync requests, this will */ - suspend fun syncTemporarily(userId: UserId, stayAliveExtraDuration: Duration = 0.seconds) { - logger.d( - "Handling connection policy for push notification of " + - "user=${userId.value.obfuscateId()}@${userId.domain.obfuscateDomain()}" - ) + suspend fun syncTemporarily( + userId: UserId, + stayAliveExtraDuration: Duration = 0.seconds, + waitForNextSyncState: Boolean = false + ) { + syncTemporarily(userId, stayAliveExtraDuration, waitForNextSyncState) { + Unit + } + } + + /** + * Attempts to perform sync, runs [actionWhenLive] after reaching live sync, then holds + * the sync request for [stayAliveExtraDuration] before releasing it. + */ + suspend fun syncTemporarily( + userId: UserId, + stayAliveExtraDuration: Duration = 0.seconds, + waitForNextSyncState: Boolean = false, + actionWhenLive: suspend () -> Unit + ) { coreLogic.getSessionScope(userId).run { logger.d("Starting Sync request") syncExecutor.request { logger.d("Waiting until live") - when (waitUntilLiveOrFailure()) { + val syncRequestResult = if (waitForNextSyncState) { + waitUntilNextLiveOrFailure() + } else { + waitUntilLiveOrFailure() + } + when (syncRequestResult) { is SyncRequestResult.Failure -> logger.w("Failed waiting until live") - is SyncRequestResult.Success -> + is SyncRequestResult.Success -> { + actionWhenLive() delay(stayAliveExtraDuration) + } } } } diff --git a/app/src/main/res/values/strings.xml b/app/src/main/res/values/strings.xml index b097bb865b9..710dfed6f35 100644 --- a/app/src/main/res/values/strings.xml +++ b/app/src/main/res/values/strings.xml @@ -1927,4 +1927,9 @@ In group conversations, the group admin can overwrite this setting. Added apps have access to the content of this conversation. You were promoted to group admin More information about this backend + + + Waiting for connection + Sending messages + diff --git a/app/src/main/startup-prof.txt b/app/src/main/startup-prof.txt index 94d9aaf352d..7958e9fb8ac 100644 --- a/app/src/main/startup-prof.txt +++ b/app/src/main/startup-prof.txt @@ -30668,7 +30668,6 @@ Lcom/wire/kalium/logic/sync/GlobalWorkScheduler; Lcom/wire/kalium/logic/sync/GlobalWorkSchedulerImpl; SPLcom/wire/kalium/logic/sync/GlobalWorkSchedulerImpl;->(Landroid/content/Context;Lcom/wire/kalium/logic/GlobalKaliumScope;)V SPLcom/wire/kalium/logic/sync/GlobalWorkSchedulerImpl;->schedulePeriodicApiVersionUpdate()V -Lcom/wire/kalium/logic/sync/PendingMessagesSenderWorker; Lcom/wire/kalium/logic/sync/WorkSchedulerImplKt; SPLcom/wire/kalium/logic/sync/WorkSchedulerImplKt;->()V SPLcom/wire/kalium/logic/sync/WorkSchedulerImplKt;->buildConnectedPeriodicWorkRequest$default(Lkotlin/reflect/KClass;Lcom/wire/kalium/logic/data/id/QualifiedID;ZILjava/lang/Object;)Landroidx/work/PeriodicWorkRequest; @@ -40417,4 +40416,4 @@ SPLorg/slf4j/helpers/Util;->()V SPLorg/slf4j/helpers/Util;->safeGetBooleanSystemProperty(Ljava/lang/String;)Z SPLorg/slf4j/helpers/Util;->safeGetSystemProperty(Ljava/lang/String;)Ljava/lang/String; Lorg/slf4j/spi/MDCAdapter; -Lorg/slf4j/spi/SLF4JServiceProvider; \ No newline at end of file +Lorg/slf4j/spi/SLF4JServiceProvider; diff --git a/app/src/test/kotlin/com/wire/android/GlobalObserversManagerTest.kt b/app/src/test/kotlin/com/wire/android/GlobalObserversManagerTest.kt index daea238ada6..26cfc5ec86c 100644 --- a/app/src/test/kotlin/com/wire/android/GlobalObserversManagerTest.kt +++ b/app/src/test/kotlin/com/wire/android/GlobalObserversManagerTest.kt @@ -25,6 +25,7 @@ import com.wire.android.datastore.UserDataStoreProvider import com.wire.android.framework.TestUser import com.wire.android.notification.NotificationChannelsManager import com.wire.android.notification.WireNotificationManager +import com.wire.android.services.SendPendingMessagesAfterForegroundSyncUseCase import com.wire.android.util.CurrentScreenManager import com.wire.kalium.common.error.CoreFailure import com.wire.kalium.logic.CoreLogic @@ -40,6 +41,8 @@ import com.wire.kalium.logic.feature.call.usecase.EndCallOnConversationChangeUse import com.wire.kalium.logic.feature.message.MessageScope import com.wire.kalium.logic.feature.session.CurrentSessionResult import com.wire.kalium.logic.feature.user.webSocketStatus.ObservePersistentWebSocketConnectionStatusUseCase +import com.wire.kalium.network.NetworkState +import com.wire.kalium.network.NetworkStateObserver import io.mockk.MockKAnnotations import io.mockk.coEvery import io.mockk.coVerify @@ -166,6 +169,108 @@ class GlobalObserversManagerTest { coVerify(exactly = 0) { arrangement.messageScope.deleteEphemeralMessageEndDate() } } + @Test + fun `given app visible and valid session, when app opens, then retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Success(AccountInfo.Valid(TestUser.SELF_USER.id))) + .withAppVisibleFlow(true) + .arrange() + + manager.observe() + + coVerify(exactly = 1) { arrangement.sendPendingMessagesAfterForegroundSync(TestUser.SELF_USER.id) } + } + + @Test + fun `given app visible, valid session, and not connected, when app opens, then do not retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Success(AccountInfo.Valid(TestUser.SELF_USER.id))) + .withNetworkState(NetworkState.NotConnected) + .withAppVisibleFlow(true) + .arrange() + + manager.observe() + + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + } + + @Test + fun `given app visible, valid session, and connected without internet, when app opens, then do not retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Success(AccountInfo.Valid(TestUser.SELF_USER.id))) + .withNetworkState(NetworkState.ConnectedWithoutInternet) + .withAppVisibleFlow(true) + .arrange() + + manager.observe() + + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + } + + @Test + fun `given app not visible and valid session, when observing app open, then do not retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Success(AccountInfo.Valid(TestUser.SELF_USER.id))) + .withAppVisibleFlow(false) + .arrange() + + manager.observe() + + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + } + + @Test + fun `given app visible and invalid session, when app opens, then do not retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Success(AccountInfo.Invalid(TestUser.SELF_USER.id, LogoutReason.DELETED_ACCOUNT))) + .withAppVisibleFlow(true) + .arrange() + + manager.observe() + + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + } + + @Test + fun `given app visible and no session, when app opens, then do not retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Failure.SessionNotFound) + .withAppVisibleFlow(true) + .arrange() + + manager.observe() + + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + } + + @Test + fun `given app visible and session failure, when app opens, then do not retry sending pending messages`() { + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Failure.Generic(CoreFailure.Unknown(RuntimeException("error")))) + .withAppVisibleFlow(true) + .arrange() + + manager.observe() + + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + } + + @Test + fun `given valid session when app becomes visible then retry sending pending messages`() { + val appVisibleFlow = MutableStateFlow(false) + val (arrangement, manager) = Arrangement() + .withCurrentSession(CurrentSessionResult.Success(AccountInfo.Valid(TestUser.SELF_USER.id))) + .withAppVisibleFlow(appVisibleFlow) + .arrange() + + manager.observe() + coVerify(exactly = 0) { arrangement.sendPendingMessagesAfterForegroundSync(any()) } + + appVisibleFlow.value = true + + coVerify(exactly = 1) { arrangement.sendPendingMessagesAfterForegroundSync(TestUser.SELF_USER.id) } + } + @Test fun `given app visible and session failure, when handling ephemeral messages, then do not call deleteEphemeralMessageEndDate`() { val (arrangement, manager) = Arrangement() @@ -253,6 +358,12 @@ class GlobalObserversManagerTest { @MockK lateinit var messageScope: MessageScope + @MockK + lateinit var sendPendingMessagesAfterForegroundSync: SendPendingMessagesAfterForegroundSyncUseCase + + @MockK + lateinit var networkStateObserver: NetworkStateObserver + private val manager by lazy { GlobalObserversManager( dispatcherProvider = TestDispatcherProvider(), @@ -261,6 +372,8 @@ class GlobalObserversManagerTest { notificationManager = notificationManager, userDataStoreProvider = userDataStoreProvider, currentScreenManager = currentScreenManager, + sendPendingMessagesAfterForegroundSync = sendPendingMessagesAfterForegroundSync, + networkStateObserver = networkStateObserver, ) } @@ -278,10 +391,13 @@ class GlobalObserversManagerTest { every { userSessionScope.calls } returns callsScope every { userSessionScope.messages } returns messageScope coEvery { messageScope.deleteEphemeralMessageEndDate() } returns Unit + coEvery { sendPendingMessagesAfterForegroundSync(any()) } returns Unit withPersistentWebSocketConnectionStatuses(emptyList()) withValidAccounts(emptyList()) withCurrentSessionFlow(CurrentSessionResult.Failure.SessionNotFound) + withCurrentSession(CurrentSessionResult.Failure.SessionNotFound) withAppVisibleFlow(true) + withNetworkState(NetworkState.ConnectedWithInternet) } fun withValidAccounts(list: List>): Arrangement = apply { @@ -301,8 +417,20 @@ class GlobalObserversManagerTest { coEvery { coreLogic.getGlobalScope().session.currentSessionFlow() } returns flowOf(result) } + fun withCurrentSession(result: CurrentSessionResult): Arrangement = apply { + coEvery { coreLogic.getGlobalScope().session.currentSession() } returns result + } + + fun withNetworkState(state: NetworkState): Arrangement = apply { + every { networkStateObserver.observeNetworkState() } returns MutableStateFlow(state) + } + fun withAppVisibleFlow(isVisible: Boolean) = apply { - coEvery { currentScreenManager.isAppVisibleFlow() } returns MutableStateFlow(isVisible) + withAppVisibleFlow(MutableStateFlow(isVisible)) + } + + fun withAppVisibleFlow(flow: MutableStateFlow) = apply { + coEvery { currentScreenManager.isAppVisibleFlow() } returns flow } fun arrange() = this to manager diff --git a/app/src/test/kotlin/com/wire/android/framework/fake/FakeSyncExecutor.kt b/app/src/test/kotlin/com/wire/android/framework/fake/FakeSyncExecutor.kt index ff956d009d3..a7f3a55efb6 100644 --- a/app/src/test/kotlin/com/wire/android/framework/fake/FakeSyncExecutor.kt +++ b/app/src/test/kotlin/com/wire/android/framework/fake/FakeSyncExecutor.kt @@ -26,8 +26,10 @@ import com.wire.kalium.util.DelicateKaliumApi open class FakeSyncExecutor : SyncExecutor() { var waitUntilLiveCount = 0 + var waitUntilNextLiveCount = 0 var requestCount = 0 open fun onWaitUntilLiveOrFailure(): SyncRequestResult = SyncRequestResult.Success.also { waitUntilLiveCount++ } + open fun onWaitUntilNextLiveOrFailure(): SyncRequestResult = SyncRequestResult.Success.also { waitUntilNextLiveCount++ } open fun onWaitUntilOrFailure(syncState: SyncState): SyncRequestResult = SyncRequestResult.Success open fun onKeepSyncAlwaysOn() {} @@ -49,6 +51,8 @@ open class FakeSyncExecutor : SyncExecutor() { override suspend fun waitUntilLiveOrFailure(): SyncRequestResult = onWaitUntilLiveOrFailure() + override suspend fun waitUntilNextLiveOrFailure(): SyncRequestResult = onWaitUntilNextLiveOrFailure() + @DelicateKaliumApi(message = "By calling this, Sync will run indefinitely.") override fun keepSyncAlwaysOn() { onKeepSyncAlwaysOn() diff --git a/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt b/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt new file mode 100644 index 00000000000..1981971b78b --- /dev/null +++ b/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt @@ -0,0 +1,101 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.android.notification.broadcastreceivers + +import android.content.Context +import android.content.Intent +import com.wire.android.services.ServicesManager +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.sync.PendingMessagesForegroundSync +import io.mockk.every +import io.mockk.MockKAnnotations +import io.mockk.impl.annotations.MockK +import io.mockk.mockk +import io.mockk.verify +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test + +class PendingMessagesScheduledReceiverTest { + + @Test + fun `given scheduled pending messages broadcast with user id when received then starts foreground service for user`() = runTest { + val (arrangement, receiver) = Arrangement().arrange() + + receiver.onReceive(arrangement.context, scheduledIntent(USER_ID)) + + verify(exactly = 1) { arrangement.servicesManager.startPendingMessagesForegroundService(USER_ID) } + } + + @Test + fun `given scheduled pending messages broadcast without user id when received then does not start foreground service`() = runTest { + val (arrangement, receiver) = Arrangement().arrange() + + receiver.onReceive(arrangement.context, scheduledIntent()) + + verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService(any()) } + } + + @Test + fun `given unexpected broadcast when received then does not start foreground service`() = runTest { + val (arrangement, receiver) = Arrangement().arrange() + + receiver.onReceive(arrangement.context, mockk { every { action } returns "unexpected-action" }) + + verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService(any()) } + } + + @Test + fun `given cancelled pending messages broadcast when received then stops foreground service`() = runTest { + val (arrangement, receiver) = Arrangement().arrange() + + receiver.onReceive( + context = arrangement.context, + intent = mockk { every { action } returns PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_CANCELLED } + ) + + verify(exactly = 1) { arrangement.servicesManager.stopPendingMessagesForegroundService() } + verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService(any()) } + } + + private class Arrangement { + + @MockK + lateinit var context: Context + + @MockK(relaxUnitFun = true) + lateinit var servicesManager: ServicesManager + + init { + MockKAnnotations.init(this) + } + + fun arrange(): Pair = + this to PendingMessagesScheduledReceiver(servicesManager) + } + + private companion object { + private val USER_ID = UserId("user", "wire.com") + + fun scheduledIntent(userId: UserId? = null): Intent = + mockk { + every { action } returns PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_SCHEDULED + every { getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE) } returns userId?.value + every { getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN) } returns userId?.domain + } + } +} diff --git a/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt b/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt new file mode 100644 index 00000000000..f305900305c --- /dev/null +++ b/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt @@ -0,0 +1,104 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.android.services + +import com.wire.kalium.common.error.CoreFailure +import com.wire.kalium.logic.data.auth.AccountInfo +import com.wire.kalium.logic.data.logout.LogoutReason +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.feature.session.CurrentSessionResult +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Test + +class PendingMessagesForegroundSyncHandlerTest { + + @Test + fun `given current session is valid when sending pending messages then sends only for current user`() = runTest { + val currentUserId = UserId("currentUser", "wire.com") + val inactiveUserId = UserId("inactiveUser", "wire.com") + val sentPendingMessagesFor = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + currentSession = { CurrentSessionResult.Success(AccountInfo.Valid(currentUserId)) }, + sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } + ) + + handler.sendPendingMessagesForCurrentSession(currentUserId) + + assertEquals(listOf(currentUserId), sentPendingMessagesFor) + assertFalse(sentPendingMessagesFor.contains(inactiveUserId)) + } + + @Test + fun `given current session is for different user when sending pending messages then does not send`() = runTest { + val currentUserId = UserId("currentUser", "wire.com") + val scheduledUserId = UserId("scheduledUser", "wire.com") + val sentPendingMessagesFor = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + currentSession = { CurrentSessionResult.Success(AccountInfo.Valid(currentUserId)) }, + sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } + ) + + handler.sendPendingMessagesForCurrentSession(scheduledUserId) + + assertEquals(emptyList(), sentPendingMessagesFor) + } + + @Test + fun `given current session is invalid when sending pending messages then does not send`() = runTest { + val currentUserId = UserId("currentUser", "wire.com") + val sentPendingMessagesFor = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + currentSession = { + CurrentSessionResult.Success(AccountInfo.Invalid(currentUserId, LogoutReason.SELF_SOFT_LOGOUT)) + }, + sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } + ) + + handler.sendPendingMessagesForCurrentSession(currentUserId) + + assertEquals(emptyList(), sentPendingMessagesFor) + } + + @Test + fun `given current session is missing when sending pending messages then does not send`() = runTest { + val sentPendingMessagesFor = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + currentSession = { CurrentSessionResult.Failure.SessionNotFound }, + sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } + ) + + handler.sendPendingMessagesForCurrentSession(UserId("currentUser", "wire.com")) + + assertEquals(emptyList(), sentPendingMessagesFor) + } + + @Test + fun `given current session lookup fails when sending pending messages then does not send`() = runTest { + val sentPendingMessagesFor = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + currentSession = { CurrentSessionResult.Failure.Generic(CoreFailure.Unknown(null)) }, + sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } + ) + + handler.sendPendingMessagesForCurrentSession(UserId("currentUser", "wire.com")) + + assertEquals(emptyList(), sentPendingMessagesFor) + } +} diff --git a/app/src/test/kotlin/com/wire/android/services/SendPendingMessagesAfterForegroundSyncUseCaseTest.kt b/app/src/test/kotlin/com/wire/android/services/SendPendingMessagesAfterForegroundSyncUseCaseTest.kt new file mode 100644 index 00000000000..915dd7ac62f --- /dev/null +++ b/app/src/test/kotlin/com/wire/android/services/SendPendingMessagesAfterForegroundSyncUseCaseTest.kt @@ -0,0 +1,116 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.android.services + +import com.wire.android.framework.fake.FakeSyncExecutor +import com.wire.android.util.CurrentScreenManager +import com.wire.android.util.lifecycle.SyncLifecycleManager +import com.wire.kalium.common.error.CoreFailure +import com.wire.kalium.logic.CoreLogic +import com.wire.kalium.logic.data.sync.SyncState +import com.wire.kalium.logic.data.user.UserId +import com.wire.kalium.logic.feature.UserSessionScope +import com.wire.kalium.logic.sync.SendPendingMessagesUseCase +import com.wire.kalium.logic.sync.SyncRequestResult +import com.wire.kalium.logic.sync.SyncStateObserver +import io.mockk.MockKAnnotations +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.every +import io.mockk.impl.annotations.MockK +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +@Suppress("DEPRECATION") +class SendPendingMessagesAfterForegroundSyncUseCaseTest { + + @Test + fun `given next sync reaches live when sending pending messages then sends inside a single temporary sync request`() = runTest { + val (arrangement, useCase) = Arrangement() + .arrange() + + useCase(USER_ID) + + assertEquals(1, arrangement.syncExecutor.requestCount) + assertEquals(0, arrangement.syncExecutor.waitUntilLiveCount) + assertEquals(1, arrangement.syncExecutor.waitUntilNextLiveCount) + coVerify(exactly = 1) { arrangement.sendPendingMessages() } + } + + @Test + fun `given next sync fails when sending pending messages then does not send`() = runTest { + val (arrangement, useCase) = Arrangement() + .withNextSyncRequestResult(SyncRequestResult.Failure(CoreFailure.Unknown(RuntimeException("sync failed")))) + .arrange() + + useCase(USER_ID) + + assertEquals(1, arrangement.syncExecutor.requestCount) + assertEquals(0, arrangement.syncExecutor.waitUntilLiveCount) + assertEquals(1, arrangement.syncExecutor.waitUntilNextLiveCount) + coVerify(exactly = 0) { arrangement.sendPendingMessages() } + } + + private class Arrangement { + + @MockK + lateinit var coreLogic: CoreLogic + + @MockK + lateinit var currentScreenManager: CurrentScreenManager + + @MockK + lateinit var userSessionScope: UserSessionScope + + @MockK + lateinit var syncStateObserver: SyncStateObserver + + @MockK + lateinit var sendPendingMessages: SendPendingMessagesUseCase + + var syncExecutor = FakeSyncExecutor() + + init { + MockKAnnotations.init(this, relaxUnitFun = true) + every { coreLogic.getSessionScope(USER_ID) } returns userSessionScope + every { userSessionScope.syncManager } returns syncStateObserver + every { syncStateObserver.syncState } returns MutableStateFlow(SyncState.Live) + every { userSessionScope.sendPendingMessages } returns sendPendingMessages + coEvery { sendPendingMessages() } returns SendPendingMessagesUseCase.Result.Success + } + + fun withNextSyncRequestResult(syncRequestResult: SyncRequestResult) = apply { + syncExecutor = object : FakeSyncExecutor() { + override fun onWaitUntilNextLiveOrFailure(): SyncRequestResult = + syncRequestResult.also { waitUntilNextLiveCount++ } + } + } + + fun arrange(): Pair { + every { userSessionScope.syncExecutor } returns syncExecutor + val syncLifecycleManager = SyncLifecycleManager(currentScreenManager, coreLogic) + return this to SendPendingMessagesAfterForegroundSyncUseCase(coreLogic, syncLifecycleManager) + } + } + + private companion object { + private val USER_ID = UserId("currentUser", "wire.com") + } +} diff --git a/app/src/test/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManagerTest.kt b/app/src/test/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManagerTest.kt index c0e8385902b..372fc15e2a9 100644 --- a/app/src/test/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManagerTest.kt +++ b/app/src/test/kotlin/com/wire/android/util/lifecycle/SyncLifecycleManagerTest.kt @@ -21,11 +21,13 @@ package com.wire.android.util.lifecycle import com.wire.android.framework.TestUser import com.wire.android.framework.fake.FakeSyncExecutor import com.wire.android.util.CurrentScreenManager +import com.wire.kalium.common.error.CoreFailure import com.wire.kalium.logic.CoreLogic import com.wire.kalium.logic.data.auth.AccountInfo import com.wire.kalium.logic.feature.UserSessionScope import com.wire.kalium.logic.feature.session.GetAllSessionsResult import com.wire.kalium.logic.feature.session.ObserveSessionsUseCase +import com.wire.kalium.logic.sync.SyncRequestResult import io.mockk.MockKAnnotations import io.mockk.coEvery import io.mockk.every @@ -33,10 +35,15 @@ import io.mockk.impl.annotations.MockK import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test +import kotlin.time.Duration.Companion.milliseconds class SyncLifecycleManagerTest { @@ -98,6 +105,104 @@ class SyncLifecycleManagerTest { assertEquals(1, arrangement.syncExecutor.waitUntilLiveCount) } + @Test + fun givenSyncReachesLive_whenRequestingTemporarySyncWithAction_thenShouldRunActionBeforeStayAliveDelayCompletes() = runTest { + val events = mutableListOf() + val (arrangement, syncLifecycleManager) = Arrangement() + .withAppInTheForeground() + .withSyncExecutor(object : FakeSyncExecutor() { + override fun onWaitUntilLiveOrFailure(): SyncRequestResult { + events += "live" + return super.onWaitUntilLiveOrFailure() + } + }) + .arrange() + + val syncJob = launch { + syncLifecycleManager.syncTemporarily(TestUser.SELF_USER_ID, 100.milliseconds) { + events += "action" + } + } + + runCurrent() + + assertEquals(listOf("live", "action"), events) + assertTrue(syncJob.isActive) + + advanceTimeBy(100.milliseconds) + advanceUntilIdle() + + assertFalse(syncJob.isActive) + assertEquals(1, arrangement.syncExecutor.requestCount) + assertEquals(1, arrangement.syncExecutor.waitUntilLiveCount) + assertEquals(0, arrangement.syncExecutor.waitUntilNextLiveCount) + } + + @Test + fun givenSyncReachesNextLive_whenRequestingTemporarySyncWithAction_thenShouldUseNextLiveWait() = runTest { + val events = mutableListOf() + val (arrangement, syncLifecycleManager) = Arrangement() + .withAppInTheForeground() + .withSyncExecutor(object : FakeSyncExecutor() { + override fun onWaitUntilNextLiveOrFailure(): SyncRequestResult { + events += "nextLive" + return super.onWaitUntilNextLiveOrFailure() + } + }) + .arrange() + + syncLifecycleManager.syncTemporarily( + userId = TestUser.SELF_USER_ID, + waitForNextSyncState = true, + ) { + events += "action" + } + + assertEquals(listOf("nextLive", "action"), events) + assertEquals(1, arrangement.syncExecutor.requestCount) + assertEquals(0, arrangement.syncExecutor.waitUntilLiveCount) + assertEquals(1, arrangement.syncExecutor.waitUntilNextLiveCount) + } + + @Test + fun givenSyncFailsBeforeLive_whenRequestingTemporarySyncWithAction_thenShouldNotRunAction() = runTest { + val (arrangement, syncLifecycleManager) = Arrangement() + .withAppInTheForeground() + .withSyncRequestResult(SyncRequestResult.Failure(CoreFailure.Unknown(RuntimeException("sync failed")))) + .arrange() + var actionInvocationCount = 0 + + syncLifecycleManager.syncTemporarily(TestUser.SELF_USER_ID) { + actionInvocationCount++ + } + + assertEquals(0, actionInvocationCount) + assertEquals(1, arrangement.syncExecutor.requestCount) + assertEquals(1, arrangement.syncExecutor.waitUntilLiveCount) + assertEquals(0, arrangement.syncExecutor.waitUntilNextLiveCount) + } + + @Test + fun givenNextSyncStateFails_whenRequestingTemporarySyncWithAction_thenShouldNotRunAction() = runTest { + val (arrangement, syncLifecycleManager) = Arrangement() + .withAppInTheForeground() + .withNextSyncRequestResult(SyncRequestResult.Failure(CoreFailure.Unknown(RuntimeException("sync failed")))) + .arrange() + var actionInvocationCount = 0 + + syncLifecycleManager.syncTemporarily( + userId = TestUser.SELF_USER_ID, + waitForNextSyncState = true, + ) { + actionInvocationCount++ + } + + assertEquals(0, actionInvocationCount) + assertEquals(1, arrangement.syncExecutor.requestCount) + assertEquals(0, arrangement.syncExecutor.waitUntilLiveCount) + assertEquals(1, arrangement.syncExecutor.waitUntilNextLiveCount) + } + private class Arrangement { @MockK @@ -139,6 +244,24 @@ class SyncLifecycleManagerTest { every { currentScreenManager.isAppVisibleFlow() } returns MutableStateFlow(true) } + fun withSyncExecutor(syncExecutor: FakeSyncExecutor) = apply { + this.syncExecutor = syncExecutor + } + + fun withSyncRequestResult(syncRequestResult: SyncRequestResult) = apply { + syncExecutor = object : FakeSyncExecutor() { + override fun onWaitUntilLiveOrFailure(): SyncRequestResult = + syncRequestResult.also { waitUntilLiveCount++ } + } + } + + fun withNextSyncRequestResult(syncRequestResult: SyncRequestResult) = apply { + syncExecutor = object : FakeSyncExecutor() { + override fun onWaitUntilNextLiveOrFailure(): SyncRequestResult = + syncRequestResult.also { waitUntilNextLiveCount++ } + } + } + fun arrange() = this to syncLifecycleManager.also { every { userSessionScope.syncExecutor } returns syncExecutor } diff --git a/core/notification/src/main/kotlin/com/wire/android/notification/NotificationConstants.kt b/core/notification/src/main/kotlin/com/wire/android/notification/NotificationConstants.kt index d6db391acfa..94fd44550cd 100644 --- a/core/notification/src/main/kotlin/com/wire/android/notification/NotificationConstants.kt +++ b/core/notification/src/main/kotlin/com/wire/android/notification/NotificationConstants.kt @@ -46,6 +46,9 @@ object NotificationConstants { const val MESSAGE_SYNC_CHANNEL_ID = "com.wire.android.message_synchronization" const val MESSAGE_SYNC_CHANNEL_NAME = "Message synchronization" + const val PENDING_MESSAGES_SYNC_CHANNEL_ID = "com.wire.android.pending_messages_synchronization" + const val PENDING_MESSAGES_SYNC_CHANNEL_NAME = "Pending messages synchronization" + const val OTHER_CHANNEL_ID = "com.wire.android.other" const val OTHER_CHANNEL_NAME = "Other essential actions" @@ -97,4 +100,5 @@ enum class NotificationIds { PERSISTENT_CHECK_NOTIFICATION_ID, PLAYING_AUDIO_MESSAGE_ID, UPLOADING_DATA_NOTIFICATION_ID, + PENDING_MESSAGES_SYNC_NOTIFICATION_ID, } diff --git a/kalium b/kalium index 56eda02b2af..09ea2aeceaf 160000 --- a/kalium +++ b/kalium @@ -1 +1 @@ -Subproject commit 56eda02b2af8c135318272348bc48af40d71815f +Subproject commit 09ea2aeceaf73b8d0f4934722890f60bc1c3f94d From 99caa93b3e3b4eaf2cb2dd8c09d83c3cad31dc1f Mon Sep 17 00:00:00 2001 From: Sergei Bakhtiarov Date: Wed, 24 Jun 2026 12:35:57 +0200 Subject: [PATCH 2/3] feat: send pending messages for all accounts (WPB-23968) --- .../PendingMessagesScheduledReceiver.kt | 17 +- .../PendingMessagesForegroundService.kt | 142 +++++++++++----- .../wire/android/services/ServicesManager.kt | 18 +- .../PendingMessagesScheduledReceiverTest.kt | 31 ++-- ...endingMessagesForegroundSyncHandlerTest.kt | 155 +++++++++++++++--- .../android/services/ServicesManagerTest.kt | 39 +++++ 6 files changed, 292 insertions(+), 110 deletions(-) diff --git a/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt b/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt index 2dc78f4b272..ab7d9b0fa88 100644 --- a/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt +++ b/app/src/main/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiver.kt @@ -23,7 +23,6 @@ import android.content.Context import android.content.Intent import com.wire.android.appLogger import com.wire.android.services.ServicesManager -import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.sync.PendingMessagesForegroundSync import dev.zacsweers.metro.AppScope import dev.zacsweers.metro.Inject @@ -36,14 +35,8 @@ class PendingMessagesScheduledReceiver @Inject constructor( override fun onReceive(context: Context, intent: Intent) { when (intent.action) { - PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_SCHEDULED -> { - val userId = intent.userId() - if (userId == null) { - appLogger.w("$TAG: missing user id, skipping pending messages foreground service") - return - } - servicesManager.startPendingMessagesForegroundService(userId) - } + PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_SCHEDULED -> + servicesManager.startPendingMessagesForegroundService() PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_CANCELLED -> servicesManager.stopPendingMessagesForegroundService() @@ -59,9 +52,3 @@ class PendingMessagesScheduledReceiver @Inject constructor( private const val TAG = "PendingMessagesScheduledReceiver" } } - -private fun Intent.userId(): UserId? { - val value = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE) - val domain = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN) - return if (value != null && domain != null) UserId(value, domain) else null -} diff --git a/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt b/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt index 6805515cafd..50b199e9133 100644 --- a/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt +++ b/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt @@ -44,11 +44,15 @@ import com.wire.android.notification.openAppPendingIntent import com.wire.android.util.dispatchers.DispatcherProvider import com.wire.android.util.lifecycle.SyncLifecycleManager import com.wire.kalium.logic.CoreLogic +import com.wire.kalium.logic.data.auth.AccountInfo +import com.wire.kalium.logic.data.logout.LogoutReason import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.logic.feature.session.CurrentSessionResult -import com.wire.kalium.logic.sync.PendingMessagesForegroundSync +import com.wire.kalium.logic.feature.auth.LogoutCallback +import com.wire.kalium.logic.feature.session.DoesValidSessionExistResult +import com.wire.kalium.logic.feature.session.GetAllSessionsResult import dev.zacsweers.metro.Inject import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.awaitClose @@ -57,6 +61,7 @@ import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull +import kotlin.coroutines.cancellation.CancellationException import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -79,12 +84,21 @@ class PendingMessagesForegroundService : Service() { CoroutineScope(SupervisorJob() + dispatcherProvider.io()) } + private var runJob: Job? = null + + private val logoutCallback = object : LogoutCallback { + override suspend fun invoke(userId: UserId, reason: LogoutReason) { + stopIfNoValidSessions() + } + } + override fun onBind(intent: Intent?): IBinder? = null override fun onCreate() { wireApplicationGraph.inject(this) super.onCreate() isServiceStarted = true + coreLogic.getGlobalScope().logoutCallbackManager.register(logoutCallback) startAsForeground(createNotification(waitingForConnection = true)) } @@ -94,22 +108,30 @@ class PendingMessagesForegroundService : Service() { startAsForeground(createNotification(waitingForConnection = true)) } - val userId = intent?.userId() - if (userId == null) { - appLogger.w("$TAG: missing user id, skipping pending messages send") - stopSelf(startId) + if (intent?.action == ACTION_STOP) { + scope.launch { + stopIfNoValidSessions(startId) + } return START_NOT_STICKY } - scope.launch { - run(userId) - stopSelf(startId) - } + return if (runJob?.isActive == true) { + appLogger.i("$TAG: pending messages send already running, skipping duplicate start") + START_NOT_STICKY + } else { + runJob = scope.launch { + try { + run() + } finally { + stopSelf() + } + } - return START_NOT_STICKY + START_NOT_STICKY + } } - private suspend fun run(userId: UserId) { + private suspend fun run() { val connected = withTimeoutOrNull(MAX_WAIT_FOR_NETWORK_MINUTES.minutes) { networkAvailability().first { it } } ?: false @@ -121,7 +143,28 @@ class PendingMessagesForegroundService : Service() { startAsForeground(createNotification(waitingForConnection = false)) - PendingMessagesForegroundSyncHandler(coreLogic, sendPendingMessagesAfterForegroundSync).sendPendingMessagesForCurrentSession(userId) + PendingMessagesForegroundSyncHandler(coreLogic, sendPendingMessagesAfterForegroundSync).sendPendingMessagesForAllValidSessions() + } + + private suspend fun stopIfNoValidSessions(startId: Int? = null) { + when (val result = coreLogic.getGlobalScope().session.allSessions()) { + is GetAllSessionsResult.Success -> { + if (result.sessions.any { it.isValid() }) { + appLogger.i("$TAG: keeping service alive for valid sessions") + } else { + appLogger.i("$TAG: no valid sessions, stopping pending messages foreground service") + startId?.let(::stopSelf) ?: stopSelf() + } + } + + is GetAllSessionsResult.Failure -> { + appLogger.w( + "$TAG: unable to get valid sessions after logout, " + + "stopping pending messages foreground service: $result" + ) + startId?.let(::stopSelf) ?: stopSelf() + } + } } private fun networkAvailability(): Flow = callbackFlow { @@ -181,6 +224,7 @@ class PendingMessagesForegroundService : Service() { private fun createNotification(waitingForConnection: Boolean): Notification = NotificationCompat.Builder(this, PENDING_MESSAGES_SYNC_CHANNEL_ID) + .setContentTitle(getString(R.string.app_name)) .setContentText( if (waitingForConnection) { resources.getString(R.string.pending_messages_notification_waiting) @@ -203,6 +247,8 @@ class PendingMessagesForegroundService : Service() { override fun onDestroy() { super.onDestroy() + coreLogic.getGlobalScope().logoutCallbackManager.unregister(logoutCallback) + runJob = null scope.cancel("PendingMessagesForegroundService was destroyed") isServiceStarted = false } @@ -210,51 +256,77 @@ class PendingMessagesForegroundService : Service() { companion object { private const val TAG = "PendingMessagesForegroundService" private const val MAX_WAIT_FOR_NETWORK_MINUTES = 30 + private const val ACTION_STOP = "com.wire.android.services.action.STOP_PENDING_MESSAGES_FOREGROUND_SERVICE" - fun newIntent(context: Context, userId: UserId): Intent = + fun newIntent(context: Context): Intent = Intent(context, PendingMessagesForegroundService::class.java) - .putExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE, userId.value) - .putExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN, userId.domain) fun stopIntent(context: Context): Intent = Intent(context, PendingMessagesForegroundService::class.java) + .setAction(ACTION_STOP) var isServiceStarted = false } } internal class PendingMessagesForegroundSyncHandler( - private val currentSession: suspend () -> CurrentSessionResult, + private val allSessions: suspend () -> GetAllSessionsResult, + private val doesValidSessionExist: suspend (UserId) -> DoesValidSessionExistResult, private val sendPendingMessagesAfterForegroundSync: suspend (UserId) -> Unit, ) { constructor( coreLogic: CoreLogic, sendPendingMessagesAfterForegroundSync: SendPendingMessagesAfterForegroundSyncUseCase, ) : this( - currentSession = { coreLogic.getGlobalScope().session.currentSession() }, + allSessions = { coreLogic.getGlobalScope().session.allSessions() }, + doesValidSessionExist = { coreLogic.getGlobalScope().doesValidSessionExist(it) }, sendPendingMessagesAfterForegroundSync = sendPendingMessagesAfterForegroundSync::invoke ) - suspend fun sendPendingMessagesForCurrentSession(scheduledUserId: UserId) { - when (val result = currentSession()) { - is CurrentSessionResult.Success -> { - val accountInfo = result.accountInfo - if (accountInfo.isValid() && accountInfo.userId == scheduledUserId) { - sendPendingMessagesAfterForegroundSync(accountInfo.userId) - } else if (accountInfo.isValid()) { + suspend fun sendPendingMessagesForAllValidSessions() { + when (val result = allSessions()) { + is GetAllSessionsResult.Success -> + result.sessions + .filterIsInstance() + .map { it.userId } + .distinct() + .forEach { userId -> + sendPendingMessagesIfSessionIsStillValid(userId) + } + + is GetAllSessionsResult.Failure -> + appLogger.w("$TAG: unable to get valid sessions, skipping pending messages send: $result") + } + appLogger.w("$TAG: foreground service finished messages sending") + } + + @Suppress("TooGenericExceptionCaught") + private suspend fun sendPendingMessagesIfSessionIsStillValid(userId: UserId) { + when (val result = doesValidSessionExist(userId)) { + is DoesValidSessionExistResult.Success -> { + if (result.doesValidSessionExist) { + try { + sendPendingMessagesAfterForegroundSync(userId) + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + appLogger.e( + "$TAG: failed to send pending messages for ${userId.toLogString()}: $e" + ) + } + } else { appLogger.w( - "$TAG: scheduled user ${scheduledUserId.toLogString()} does not match current session " + - "${accountInfo.userId.toLogString()}, skipping pending messages send" + "$TAG: session for ${userId.toLogString()} is no longer valid, skipping pending messages send" ) - } else { - appLogger.w("$TAG: current session is invalid, skipping pending messages send") } } - is CurrentSessionResult.Failure -> - appLogger.w("$TAG: unable to get current valid session: $result") + is DoesValidSessionExistResult.Failure -> + appLogger.w( + "$TAG: unable to check valid session for ${userId.toLogString()}, " + + "skipping pending messages send: $result" + ) } - appLogger.w("$TAG: foreground service finished messages sending") } private companion object { @@ -262,12 +334,6 @@ internal class PendingMessagesForegroundSyncHandler( } } -private fun Intent.userId(): UserId? { - val value = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE) - val domain = getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN) - return if (value != null && domain != null) UserId(value, domain) else null -} - class SendPendingMessagesAfterForegroundSyncUseCase @Inject constructor( @KaliumCoreLogic private val coreLogic: CoreLogic, private val syncLifecycleManager: SyncLifecycleManager, diff --git a/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt b/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt index 3fe1bdb3c2e..d94fa04ec5b 100644 --- a/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt +++ b/app/src/main/kotlin/com/wire/android/services/ServicesManager.kt @@ -26,6 +26,9 @@ import com.wire.android.appLogger import com.wire.android.util.dispatchers.DispatcherProvider import com.wire.kalium.logic.data.id.ConversationId import com.wire.kalium.logic.data.user.UserId +import dev.zacsweers.metro.AppScope +import dev.zacsweers.metro.Inject +import dev.zacsweers.metro.SingleIn import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.MutableSharedFlow @@ -33,9 +36,6 @@ import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.launch -import dev.zacsweers.metro.Inject -import dev.zacsweers.metro.AppScope -import dev.zacsweers.metro.SingleIn /** * This is helper class that should be used for starting/stopping any services. @@ -139,16 +139,14 @@ class ServicesManager @Inject constructor( PersistentWebSocketService.isServiceStarted // Pending messages foreground sync - fun startPendingMessagesForegroundService(userId: UserId) { - if (PendingMessagesForegroundService.isServiceStarted) { - appLogger.i("$TAG: PendingMessagesForegroundService already started, not starting again") - } else { - startService(PendingMessagesForegroundService.newIntent(context, userId)) - } + fun startPendingMessagesForegroundService() { + startService(PendingMessagesForegroundService.newIntent(context)) } fun stopPendingMessagesForegroundService() { - stopService(PendingMessagesForegroundService.stopIntent(context)) + if (PendingMessagesForegroundService.isServiceStarted) { + startService(PendingMessagesForegroundService.stopIntent(context)) + } } // Playing AudioMessage service diff --git a/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt b/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt index 1981971b78b..ae1741f1a89 100644 --- a/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt +++ b/app/src/test/kotlin/com/wire/android/notification/broadcastreceivers/PendingMessagesScheduledReceiverTest.kt @@ -20,7 +20,6 @@ package com.wire.android.notification.broadcastreceivers import android.content.Context import android.content.Intent import com.wire.android.services.ServicesManager -import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.sync.PendingMessagesForegroundSync import io.mockk.every import io.mockk.MockKAnnotations @@ -33,21 +32,12 @@ import org.junit.jupiter.api.Test class PendingMessagesScheduledReceiverTest { @Test - fun `given scheduled pending messages broadcast with user id when received then starts foreground service for user`() = runTest { - val (arrangement, receiver) = Arrangement().arrange() - - receiver.onReceive(arrangement.context, scheduledIntent(USER_ID)) - - verify(exactly = 1) { arrangement.servicesManager.startPendingMessagesForegroundService(USER_ID) } - } - - @Test - fun `given scheduled pending messages broadcast without user id when received then does not start foreground service`() = runTest { + fun `given scheduled pending messages broadcast without user id when received then starts foreground service`() = runTest { val (arrangement, receiver) = Arrangement().arrange() receiver.onReceive(arrangement.context, scheduledIntent()) - verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService(any()) } + verify(exactly = 1) { arrangement.servicesManager.startPendingMessagesForegroundService() } } @Test @@ -56,7 +46,7 @@ class PendingMessagesScheduledReceiverTest { receiver.onReceive(arrangement.context, mockk { every { action } returns "unexpected-action" }) - verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService(any()) } + verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService() } } @Test @@ -65,11 +55,11 @@ class PendingMessagesScheduledReceiverTest { receiver.onReceive( context = arrangement.context, - intent = mockk { every { action } returns PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_CANCELLED } + intent = cancelledIntent() ) verify(exactly = 1) { arrangement.servicesManager.stopPendingMessagesForegroundService() } - verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService(any()) } + verify(exactly = 0) { arrangement.servicesManager.startPendingMessagesForegroundService() } } private class Arrangement { @@ -89,13 +79,14 @@ class PendingMessagesScheduledReceiverTest { } private companion object { - private val USER_ID = UserId("user", "wire.com") - - fun scheduledIntent(userId: UserId? = null): Intent = + fun scheduledIntent(): Intent = mockk { every { action } returns PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_SCHEDULED - every { getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_VALUE) } returns userId?.value - every { getStringExtra(PendingMessagesForegroundSync.EXTRA_USER_ID_DOMAIN) } returns userId?.domain + } + + fun cancelledIntent(): Intent = + mockk { + every { action } returns PendingMessagesForegroundSync.ACTION_SENDING_OF_PENDING_MESSAGES_CANCELLED } } } diff --git a/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt b/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt index f305900305c..36383f92197 100644 --- a/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt +++ b/app/src/test/kotlin/com/wire/android/services/PendingMessagesForegroundSyncHandlerTest.kt @@ -21,84 +21,185 @@ import com.wire.kalium.common.error.CoreFailure import com.wire.kalium.logic.data.auth.AccountInfo import com.wire.kalium.logic.data.logout.LogoutReason import com.wire.kalium.logic.data.user.UserId -import com.wire.kalium.logic.feature.session.CurrentSessionResult +import com.wire.kalium.logic.feature.session.DoesValidSessionExistResult +import com.wire.kalium.logic.feature.session.GetAllSessionsResult import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Test class PendingMessagesForegroundSyncHandlerTest { @Test - fun `given current session is valid when sending pending messages then sends only for current user`() = runTest { - val currentUserId = UserId("currentUser", "wire.com") - val inactiveUserId = UserId("inactiveUser", "wire.com") + fun `given multiple valid sessions when sending pending messages then sends sequentially for each user`() = runTest { + val events = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + allSessions = { + GetAllSessionsResult.Success( + listOf( + AccountInfo.Valid(FIRST_USER_ID), + AccountInfo.Valid(SECOND_USER_ID), + ) + ) + }, + doesValidSessionExist = { + events.add("check:${it.value}") + DoesValidSessionExistResult.Success(true) + }, + sendPendingMessagesAfterForegroundSync = { + events.add("send:${it.value}") + } + ) + + handler.sendPendingMessagesForAllValidSessions() + + assertEquals( + listOf( + "check:${FIRST_USER_ID.value}", + "send:${FIRST_USER_ID.value}", + "check:${SECOND_USER_ID.value}", + "send:${SECOND_USER_ID.value}", + ), + events + ) + } + + @Test + fun `given duplicate and invalid sessions when sending pending messages then sends once per valid user`() = runTest { val sentPendingMessagesFor = mutableListOf() val handler = PendingMessagesForegroundSyncHandler( - currentSession = { CurrentSessionResult.Success(AccountInfo.Valid(currentUserId)) }, + allSessions = { + GetAllSessionsResult.Success( + listOf( + AccountInfo.Valid(FIRST_USER_ID), + AccountInfo.Invalid(THIRD_USER_ID, LogoutReason.SELF_SOFT_LOGOUT), + AccountInfo.Valid(FIRST_USER_ID), + AccountInfo.Valid(SECOND_USER_ID), + ) + ) + }, + doesValidSessionExist = { DoesValidSessionExistResult.Success(true) }, sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } ) - handler.sendPendingMessagesForCurrentSession(currentUserId) + handler.sendPendingMessagesForAllValidSessions() - assertEquals(listOf(currentUserId), sentPendingMessagesFor) - assertFalse(sentPendingMessagesFor.contains(inactiveUserId)) + assertEquals(listOf(FIRST_USER_ID, SECOND_USER_ID), sentPendingMessagesFor) } @Test - fun `given current session is for different user when sending pending messages then does not send`() = runTest { - val currentUserId = UserId("currentUser", "wire.com") - val scheduledUserId = UserId("scheduledUser", "wire.com") + fun `given user logs out before sending when sending pending messages then skips that user`() = runTest { val sentPendingMessagesFor = mutableListOf() val handler = PendingMessagesForegroundSyncHandler( - currentSession = { CurrentSessionResult.Success(AccountInfo.Valid(currentUserId)) }, + allSessions = { + GetAllSessionsResult.Success( + listOf( + AccountInfo.Valid(FIRST_USER_ID), + AccountInfo.Valid(SECOND_USER_ID), + ) + ) + }, + doesValidSessionExist = { + DoesValidSessionExistResult.Success(it != FIRST_USER_ID) + }, sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } ) - handler.sendPendingMessagesForCurrentSession(scheduledUserId) + handler.sendPendingMessagesForAllValidSessions() - assertEquals(emptyList(), sentPendingMessagesFor) + assertEquals(listOf(SECOND_USER_ID), sentPendingMessagesFor) } @Test - fun `given current session is invalid when sending pending messages then does not send`() = runTest { - val currentUserId = UserId("currentUser", "wire.com") + fun `given sending fails for one user when sending pending messages then continues with remaining users`() = runTest { val sentPendingMessagesFor = mutableListOf() val handler = PendingMessagesForegroundSyncHandler( - currentSession = { - CurrentSessionResult.Success(AccountInfo.Invalid(currentUserId, LogoutReason.SELF_SOFT_LOGOUT)) + allSessions = { + GetAllSessionsResult.Success( + listOf( + AccountInfo.Valid(FIRST_USER_ID), + AccountInfo.Valid(SECOND_USER_ID), + ) + ) + }, + doesValidSessionExist = { DoesValidSessionExistResult.Success(true) }, + sendPendingMessagesAfterForegroundSync = { + if (it == FIRST_USER_ID) { + throw RuntimeException("backend failed") + } + sentPendingMessagesFor.add(it) + } + ) + + handler.sendPendingMessagesForAllValidSessions() + + assertEquals(listOf(SECOND_USER_ID), sentPendingMessagesFor) + } + + @Test + fun `given valid session re-check fails for one user when sending pending messages then continues with remaining users`() = runTest { + val sentPendingMessagesFor = mutableListOf() + val handler = PendingMessagesForegroundSyncHandler( + allSessions = { + GetAllSessionsResult.Success( + listOf( + AccountInfo.Valid(FIRST_USER_ID), + AccountInfo.Valid(SECOND_USER_ID), + ) + ) + }, + doesValidSessionExist = { + if (it == FIRST_USER_ID) { + DoesValidSessionExistResult.Failure.Generic(CoreFailure.Unknown(null)) + } else { + DoesValidSessionExistResult.Success(true) + } }, sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } ) - handler.sendPendingMessagesForCurrentSession(currentUserId) + handler.sendPendingMessagesForAllValidSessions() - assertEquals(emptyList(), sentPendingMessagesFor) + assertEquals(listOf(SECOND_USER_ID), sentPendingMessagesFor) } @Test - fun `given current session is missing when sending pending messages then does not send`() = runTest { + fun `given getting valid sessions fails when sending pending messages then sends nothing`() = runTest { val sentPendingMessagesFor = mutableListOf() val handler = PendingMessagesForegroundSyncHandler( - currentSession = { CurrentSessionResult.Failure.SessionNotFound }, + allSessions = { GetAllSessionsResult.Failure.Generic(CoreFailure.Unknown(null)) }, + doesValidSessionExist = { DoesValidSessionExistResult.Success(true) }, sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } ) - handler.sendPendingMessagesForCurrentSession(UserId("currentUser", "wire.com")) + handler.sendPendingMessagesForAllValidSessions() assertEquals(emptyList(), sentPendingMessagesFor) } @Test - fun `given current session lookup fails when sending pending messages then does not send`() = runTest { + fun `given no valid sessions when sending pending messages then sends nothing`() = runTest { val sentPendingMessagesFor = mutableListOf() val handler = PendingMessagesForegroundSyncHandler( - currentSession = { CurrentSessionResult.Failure.Generic(CoreFailure.Unknown(null)) }, + allSessions = { + GetAllSessionsResult.Success( + listOf( + AccountInfo.Invalid(FIRST_USER_ID, LogoutReason.SELF_SOFT_LOGOUT), + AccountInfo.Invalid(SECOND_USER_ID, LogoutReason.SESSION_EXPIRED), + ) + ) + }, + doesValidSessionExist = { DoesValidSessionExistResult.Success(true) }, sendPendingMessagesAfterForegroundSync = { sentPendingMessagesFor.add(it) } ) - handler.sendPendingMessagesForCurrentSession(UserId("currentUser", "wire.com")) + handler.sendPendingMessagesForAllValidSessions() assertEquals(emptyList(), sentPendingMessagesFor) } + + private companion object { + private val FIRST_USER_ID = UserId("firstUser", "wire.com") + private val SECOND_USER_ID = UserId("secondUser", "wire.com") + private val THIRD_USER_ID = UserId("thirdUser", "wire.com") + } } diff --git a/app/src/test/kotlin/com/wire/android/services/ServicesManagerTest.kt b/app/src/test/kotlin/com/wire/android/services/ServicesManagerTest.kt index 65408aafe5b..437dfdeafbe 100644 --- a/app/src/test/kotlin/com/wire/android/services/ServicesManagerTest.kt +++ b/app/src/test/kotlin/com/wire/android/services/ServicesManagerTest.kt @@ -249,6 +249,36 @@ class ServicesManagerTest { } } + @Test + fun `given pending messages foreground service not started when starting then starts service`() = + runTest(dispatcherProvider.main()) { + // given + val (arrangement, servicesManager) = Arrangement() + .withPendingMessagesServiceStarted(false) + .arrange() + + // when + servicesManager.startPendingMessagesForegroundService() + + // then + verify(exactly = 1) { arrangement.context.startService(arrangement.pendingMessagesForegroundServiceIntent) } + } + + @Test + fun `given pending messages foreground service already started when starting then forwards start intent`() = + runTest(dispatcherProvider.main()) { + // given + val (arrangement, servicesManager) = Arrangement() + .withPendingMessagesServiceStarted(true) + .arrange() + + // when + servicesManager.startPendingMessagesForegroundService() + + // then + verify(exactly = 1) { arrangement.context.startService(arrangement.pendingMessagesForegroundServiceIntent) } + } + private inner class Arrangement { @MockK(relaxed = true) @@ -263,12 +293,17 @@ class ServicesManagerTest { @MockK lateinit var ongoingCallServiceIntentWithStopArgument: Intent + @MockK + lateinit var pendingMessagesForegroundServiceIntent: Intent + init { MockKAnnotations.init(this, relaxUnitFun = true) mockkObject(CallService.Companion) + mockkObject(PendingMessagesForegroundService.Companion) every { CallService.Companion.serviceState } returns serviceStateFlow callServiceIntentForAction(CallService.Action.Start.Default, callServiceIntent) callServiceIntentForAction(CallService.Action.Stop, ongoingCallServiceIntentWithStopArgument) + every { PendingMessagesForegroundService.Companion.newIntent(context) } returns pendingMessagesForegroundServiceIntent } fun clearRecordedCallsForContext() { @@ -290,6 +325,10 @@ class ServicesManagerTest { every { CallService.Companion.newIntent(context, action) } returns intent } + fun withPendingMessagesServiceStarted(isStarted: Boolean) = apply { + every { PendingMessagesForegroundService.Companion.isServiceStarted } returns isStarted + } + fun arrange() = this to servicesManager } } From 77e7efa978f2d9be02ded88e8dc408d35fa8fdc4 Mon Sep 17 00:00:00 2001 From: Sergei Bakhtiarov Date: Wed, 24 Jun 2026 12:54:34 +0200 Subject: [PATCH 3/3] feat: use existing NetworkStateObserver (WPB-23968) --- .../PendingMessagesForegroundService.kt | 52 +++------- ...PendingMessagesNetworkStateObserverTest.kt | 95 +++++++++++++++++++ 2 files changed, 107 insertions(+), 40 deletions(-) create mode 100644 app/src/test/kotlin/com/wire/android/services/PendingMessagesNetworkStateObserverTest.kt diff --git a/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt b/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt index 50b199e9133..543171187c4 100644 --- a/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt +++ b/app/src/main/kotlin/com/wire/android/services/PendingMessagesForegroundService.kt @@ -24,10 +24,6 @@ import android.app.Service import android.content.Context import android.content.Intent import android.content.pm.ServiceInfo -import android.net.ConnectivityManager -import android.net.Network -import android.net.NetworkCapabilities -import android.net.NetworkRequest import android.os.Build import android.os.IBinder import androidx.core.app.NotificationCompat @@ -50,18 +46,18 @@ import com.wire.kalium.logic.data.user.UserId import com.wire.kalium.logic.feature.auth.LogoutCallback import com.wire.kalium.logic.feature.session.DoesValidSessionExistResult import com.wire.kalium.logic.feature.session.GetAllSessionsResult +import com.wire.kalium.network.NetworkState +import com.wire.kalium.network.NetworkStateObserver import dev.zacsweers.metro.Inject import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel -import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeoutOrNull import kotlin.coroutines.cancellation.CancellationException +import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -77,6 +73,9 @@ class PendingMessagesForegroundService : Service() { @Inject lateinit var notificationChannelsManager: NotificationChannelsManager + @Inject + lateinit var networkStateObserver: NetworkStateObserver + @Inject private lateinit var sendPendingMessagesAfterForegroundSync: SendPendingMessagesAfterForegroundSyncUseCase @@ -132,9 +131,7 @@ class PendingMessagesForegroundService : Service() { } private suspend fun run() { - val connected = withTimeoutOrNull(MAX_WAIT_FOR_NETWORK_MINUTES.minutes) { - networkAvailability().first { it } - } ?: false + val connected = networkStateObserver.waitUntilConnectedWithInternet(MAX_WAIT_FOR_NETWORK_MINUTES.minutes) if (!connected) { appLogger.i("$TAG: network did not become available before timeout") @@ -167,36 +164,6 @@ class PendingMessagesForegroundService : Service() { } } - private fun networkAvailability(): Flow = callbackFlow { - val connectivityManager = getSystemService(CONNECTIVITY_SERVICE) as ConnectivityManager - fun emitCurrentAvailability() { - trySend(connectivityManager.hasValidatedInternet()) - } - - val callback = object : ConnectivityManager.NetworkCallback() { - override fun onAvailable(network: Network) = emitCurrentAvailability() - override fun onLost(network: Network) = emitCurrentAvailability() - override fun onCapabilitiesChanged(network: Network, networkCapabilities: NetworkCapabilities) = emitCurrentAvailability() - } - - emitCurrentAvailability() - connectivityManager.registerNetworkCallback( - NetworkRequest.Builder() - .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) - .build(), - callback - ) - awaitClose { connectivityManager.unregisterNetworkCallback(callback) } - } - - private fun ConnectivityManager.hasValidatedInternet(): Boolean = - activeNetwork - ?.let(::getNetworkCapabilities) - ?.let { - it.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) && - it.hasCapability(NetworkCapabilities.NET_CAPABILITY_VALIDATED) - } == true - private fun startAsForeground(notification: Notification) { notificationChannelsManager.createRegularChannel(PENDING_MESSAGES_SYNC_CHANNEL_ID, PENDING_MESSAGES_SYNC_CHANNEL_NAME) @@ -269,6 +236,11 @@ class PendingMessagesForegroundService : Service() { } } +internal suspend fun NetworkStateObserver.waitUntilConnectedWithInternet(timeout: Duration): Boolean = + withTimeoutOrNull(timeout) { + observeNetworkState().first { it is NetworkState.ConnectedWithInternet } + } != null + internal class PendingMessagesForegroundSyncHandler( private val allSessions: suspend () -> GetAllSessionsResult, private val doesValidSessionExist: suspend (UserId) -> DoesValidSessionExistResult, diff --git a/app/src/test/kotlin/com/wire/android/services/PendingMessagesNetworkStateObserverTest.kt b/app/src/test/kotlin/com/wire/android/services/PendingMessagesNetworkStateObserverTest.kt new file mode 100644 index 00000000000..d73e61ad8d7 --- /dev/null +++ b/app/src/test/kotlin/com/wire/android/services/PendingMessagesNetworkStateObserverTest.kt @@ -0,0 +1,95 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.android.services + +import com.wire.kalium.network.CurrentNetwork +import com.wire.kalium.network.NetworkState +import com.wire.kalium.network.NetworkStateObserver +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import kotlin.time.Duration.Companion.seconds + +class PendingMessagesNetworkStateObserverTest { + + @Test + fun givenNetworkIsAlreadyConnectedWithInternet_whenWaitingForConnection_thenReturnsConnected() = runTest { + val observer = TestNetworkStateObserver(NetworkState.ConnectedWithInternet) + + val result = observer.waitUntilConnectedWithInternet(1.seconds) + + assertTrue(result) + } + + @Test + fun givenNetworkBecomesConnectedWithInternet_whenWaitingForConnection_thenReturnsConnected() = runTest { + val observer = TestNetworkStateObserver(NetworkState.NotConnected) + val result = async { + observer.waitUntilConnectedWithInternet(1.seconds) + } + + runCurrent() + observer.updateNetworkState(NetworkState.ConnectedWithInternet) + + assertTrue(result.await()) + } + + @Test + fun givenNetworkStaysNotConnected_whenWaitingForConnection_thenReturnsNotConnected() = runTest { + val observer = TestNetworkStateObserver(NetworkState.NotConnected) + val result = async { + observer.waitUntilConnectedWithInternet(1.seconds) + } + + advanceTimeBy(1.seconds.inWholeMilliseconds + 1) + + assertFalse(result.await()) + } + + @Test + fun givenNetworkStaysConnectedWithoutInternet_whenWaitingForConnection_thenReturnsNotConnected() = runTest { + val observer = TestNetworkStateObserver(NetworkState.ConnectedWithoutInternet) + val result = async { + observer.waitUntilConnectedWithInternet(1.seconds) + } + + advanceTimeBy(1.seconds.inWholeMilliseconds + 1) + + assertFalse(result.await()) + } + + private class TestNetworkStateObserver( + initialState: NetworkState, + ) : NetworkStateObserver { + private val networkState = MutableStateFlow(initialState) + + override fun observeNetworkState(): StateFlow = networkState + + override fun observeCurrentNetwork(): StateFlow = MutableStateFlow(null) + + fun updateNetworkState(state: NetworkState) { + networkState.value = state + } + } +}