diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt index 97a3d5b..29129fb 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/AbstractActor.kt @@ -41,15 +41,24 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor private var __receive__: Receive = EmptyReceive - private val childs: MutableMap = emptyMap().toMutableMap() + private val children: MutableMap = emptyMap().toMutableMap() - protected fun child(name: String) = childs[name] + protected fun child(name: String) = children[name] protected fun become(receive: Receive) { __receive__ = receive } protected val LOG = LoggerFactory.getLogger(this::class.java) - protected fun unhandled(msg: Any) { + protected suspend fun handleMsgDefault(msg: Any) { + when (msg) { + is ConnectionManager.Companion.Message.In.Stop -> { + stopActor() + } + else -> unhandled(msg) + } + } + + private fun unhandled(msg: Any) { if (LOG.isWarnEnabled) { LOG.warn("received unexpected message $msg in ${coroutineContext[CoroutineName]} actor") } @@ -59,22 +68,22 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor protected val name: String = coroutineContext[CoroutineName]!!.name + protected open suspend fun stopActor() { + forEachActorNode { it.send(ConnectionManager.Companion.Message.In.Stop) } + channel.close() + } + protected open fun beforeCloseChannel() { - childs.forEach { (_, c) -> c.close() } + children.forEach { (_, c) -> c.close() } } - protected fun forEachActorNode(ope: (ActorRef) -> Unit) { - childs.forEach { (_, actorRef) -> ope(actorRef) } + protected suspend fun forEachActorNode(ope: suspend (ActorRef) -> Unit) { + children.forEach { (_, actorRef) -> ope(actorRef) } } override val channel: Channel = object : Channel by actorScope.channel { override suspend fun send(element: Any) { - if(actorScope.channel.isClosedForSend){ - LOG.debug("Channel is close for send. Message {} isn't sent to actor {}.", element.javaClass.simpleName, name) - } else { - LOG.debug("Send message {} to actor {}.", element.javaClass.simpleName, name) - actorScope.channel.send(element) - } + actorScope.channel.sendMessageToChannelSafely(element, name) } override fun close(cause: Throwable?): Boolean { @@ -94,7 +103,7 @@ abstract class AbstractActor protected constructor(private val actorScope: Actor val childRef = actorScope.actor( Dispatchers.IO.plus(CoroutineName(name)).plus(ParentActor(this.channel)).plus(context), capacity, start, onCompletion) { __workflow__(LOG, block)() } - childs.put(name, childRef) + children.put(name, childRef) return childRef } @@ -166,3 +175,17 @@ data class ParentActor(val ref: ActorRef) : AbstractCoroutineContextElement(Pare class ActorException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable) class ActorCreationException(val actorName: String, val actorRef: ActorRef, throwable: Throwable) : Exception(throwable) + + +@OptIn(ExperimentalCoroutinesApi::class) +suspend fun SendChannel.sendMessageToChannelSafely(message: Any, + name: String = this.toString()) { + val logger = LoggerFactory.getLogger(this::class.java) + if (isClosedForSend) { + logger.debug("Channel is close for send. Message {} isn't sent to actor {}.", + message.javaClass.simpleName, name) + } else { + logger.debug("Send message {} to actor {}.", message.javaClass.simpleName, name) + send(message) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt index bdb143b..7b2f6b8 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ActionManager.kt @@ -79,7 +79,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { LOG.warn("ErrMsg. Not yet implemented") } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt index f8c02a7..42a333b 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/ConnectionManager.kt @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { channel.send(In.Ping) } if (msg is In.DeploymentFeedback) { - notificationManager.send(MessageListener.Message.Event + notificationManager.sendMessageToChannelSafely(MessageListener.Message.Event .DeployFeedbackRequestResult(success, msg.feedback.id, msg.closeAction, msg.feedback.status.details)) } @@ -121,7 +121,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is In.SetPing -> become(stoppedReceive(state.copy(clientPingInterval = msg.duration, lastPing = Instant.EPOCH))) - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -158,7 +158,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } else -> { - unhandled(msg) + handleMsgDefault(msg) } } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt index 59a1268..3280761 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DeploymentManager.kt @@ -36,7 +36,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { stopUpdateAndNotify(msg) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -61,7 +61,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is CancelForced -> { stopUpdate() } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -95,7 +95,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is CancelForced -> { LOG.info("Force cancel ignored") } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt index 6375e35..f3fde5b 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/DownloadManager.kt @@ -72,7 +72,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -196,7 +196,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { stopUpdate() } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -221,7 +221,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { Status.ERROR, "Failed to download file with md5 ${msg.md5} due to ${msg.message}", msg.message) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -323,12 +323,6 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { } } - //todo remove FileDownloader.Companion.Message.Stop message and use default implementation of beforeCloseChannel - @OptIn(ExperimentalCoroutinesApi::class) - override fun beforeCloseChannel() { - forEachActorNode { actorRef -> if(!actorRef.isClosedForSend) launch { actorRef.send(FileDownloader.Companion.Message.Stop) } } - } - init { become(beforeStartReceive()) } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt index aa237c4..8527e11 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/FileDownloader.kt @@ -25,7 +25,6 @@ import java.lang.StringBuilder import java.security.DigestInputStream import java.security.MessageDigest import kotlin.time.DurationUnit -import kotlin.time.ExperimentalTime import kotlin.time.toDuration @OptIn(ObsoleteCoroutinesApi::class) @@ -40,6 +39,8 @@ private constructor( private val downloadBehavior: DownloadBehavior = coroutineContext[HaraClientContext]!!.downloadBehavior private val notificationManager = coroutineContext[NMActor]!!.ref private val connectionManager = coroutineContext[CMActor]!!.ref + private var downloadJob: Job? = null + private var downloadScope: CoroutineScope = CoroutineScope(Dispatchers.IO) private fun beforeStart(state: State): Receive = { msg -> when (msg) { @@ -54,9 +55,7 @@ private constructor( } } - is Message.Stop -> this.cancel() - - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -84,21 +83,18 @@ private constructor( tryDownload(newState, msg.cause) } - is Message.Stop -> this.cancel() - - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } - @OptIn(ExperimentalTime::class) - private suspend fun tryDownload(state: State, error:Throwable? = null) = withContext(Dispatchers.IO){ + private suspend fun tryDownload(state: State, error:Throwable? = null) { when(val tryDownload = downloadBehavior.onAttempt(state.currentAttempt, "${state.actionId}-${fileToDownload.md5}", error)){ is DownloadBehavior.Try.Stop -> channel.send(Message.TrialExhausted) is DownloadBehavior.Try.After -> { - launch { + downloadJob = downloadScope.launch { if(error != null){ val errorMessage = "Retry download of ${fileToDownload.fileName} due to: $error. The download will start in ${tryDownload.seconds.toDuration(DurationUnit.SECONDS)}." parent!!.send(Message.Info(channel, fileToDownload.md5, errorMessage)) @@ -136,14 +132,27 @@ private constructor( val timer = checkDownloadProgress(inputStream, queue, actionId) runCatching { - file.outputStream().use { - inputStream.copyTo(it) + inputStream.use { inputStream -> + file.outputStream().use { outputStream -> + val buffer = ByteArray(DEFAULT_BUFFER_SIZE) + var progressBytes = 0L + var bytes = inputStream.read(buffer) + while (bytes >= 0) { + if (!downloadScope.isActive) { + LOG.info("Download of ${fileToDownload.fileName} was cancelled") + return + } + outputStream.write(buffer, 0, bytes) + progressBytes += bytes + bytes = inputStream.read(buffer) + } + } } }.also { timer.purge() timer.cancel() }.onFailure { - throw it + throw it } } @@ -209,6 +218,19 @@ private constructor( } } + override suspend fun stopActor() { + runCatching { + downloadJob?.let { + LOG.debug("Cancelling download job $it") + if (it.isActive) { + it.cancel() + downloadScope.cancel() + } + } + } + super.stopActor() + } + private fun State.nextAttempt():Int = if (currentAttempt == Int.MAX_VALUE) currentAttempt else currentAttempt + 1 init { @@ -244,7 +266,6 @@ private constructor( sealed class Message { object Start : Message() - object Stop : Message() object FileDownloaded : Message() object FileChecked : Message() diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt index cfeb41e..b23056d 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/NotificationManager.kt @@ -25,7 +25,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { is MessageListener.Message -> listeners.forEach { it.onMessage(msg) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt index a571238..708f15d 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/RootActor.kt @@ -23,12 +23,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { when (msg) { is In.Start, In.ForcePing -> child("connectionManager")!!.send(msg) - is In.Stop -> { - child("connectionManager")!!.send(msg) - channel.close() - } - - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } diff --git a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt index 464b07f..30a108d 100644 --- a/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt +++ b/src/main/kotlin/org/eclipse/hara/ddiclient/api/actors/UpdateManager.kt @@ -62,7 +62,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { LOG.info(message) } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -89,7 +89,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { stopUpdate() } - else -> unhandled(msg) + else -> handleMsgDefault(msg) } } @@ -146,7 +146,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { sendFeedback(msg.info.id, closed, Progress(0,0), success, "No update applied" ) - notificationManager.send(MessageListener.Message.Event.NoUpdate) + notificationManager.sendMessageToChannelSafely(MessageListener.Message.Event.NoUpdate) } updaterError.isNotEmpty() -> { @@ -154,14 +154,14 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { parent!!.send(DeploymentManager.Companion.Message.UpdateFailed) sendFeedback(msg.info.id, closed, Progress(updaters.size, updaterError[0].first), failure, *details.toTypedArray()) - notificationManager.send(MessageListener.Message.Event.UpdateFinished(successApply = false, details = details)) + notificationManager.sendMessageToChannelSafely(MessageListener.Message.Event.UpdateFinished(successApply = false, details = details)) } else -> { parent!!.send(DeploymentManager.Companion.Message.UpdateFinished) sendFeedback(msg.info.id, closed, Progress(updaters.size, updaters.size), success, *details.toTypedArray()) - notificationManager.send(MessageListener.Message.Event.UpdateFinished(successApply = true, details = details)) + notificationManager.sendMessageToChannelSafely(MessageListener.Message.Event.UpdateFinished(successApply = true, details = details)) } } } @@ -201,7 +201,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { vararg messages: String ) { val request = DeploymentFeedbackRequest.newInstance(id, execution, progress, finished, *messages) - connectionManager.send(DeploymentFeedback(request)) + connectionManager.sendMessageToChannelSafely(DeploymentFeedback(request)) } private fun convert(swModule: Updater.SwModule, pathCalculator: (Updater.SwModule.Artifact) -> String): Updater.SwModuleWithPath = @@ -225,7 +225,7 @@ private constructor(scope: ActorScope) : AbstractActor(scope) { private suspend fun stopUpdate() { LOG.info("Stopping update") channel.cancel() - notificationManager.send(MessageListener.Message.State.CancellingUpdate) + notificationManager.sendMessageToChannelSafely(MessageListener.Message.State.CancellingUpdate) parent!!.send(ActionManager.Companion.Message.UpdateStopped) } diff --git a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt new file mode 100644 index 0000000..bb5ced9 --- /dev/null +++ b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/HaraClientStoppingTest.kt @@ -0,0 +1,232 @@ +/* + * + * * Copyright © 2017-2024 Kynetics LLC + * * + * * This program and the accompanying materials are made + * * available under the terms of the Eclipse Public License 2.0 + * * which is available at https://www.eclipse.org/legal/epl-2.0/ + * * + * * SPDX-License-Identifier: EPL-2.0 + * + */ +package org.eclipse.hara.ddiclient.integrationtest + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import org.eclipse.hara.ddiclient.api.DownloadBehavior +import org.eclipse.hara.ddiclient.api.HaraClient +import org.eclipse.hara.ddiclient.api.MessageListener +import org.eclipse.hara.ddiclient.api.Updater +import org.eclipse.hara.ddiclient.integrationtest.abstractions.AbstractHaraMessageTest.ExpectedMessage +import org.eclipse.hara.ddiclient.integrationtest.abstractions.AbstractTest +import org.eclipse.hara.ddiclient.integrationtest.api.management.AssignDistributionType +import org.eclipse.hara.ddiclient.integrationtest.api.management.HawkbitAssignDistributionBody +import org.eclipse.hara.ddiclient.integrationtest.utils.TestUtils +import org.eclipse.hara.ddiclient.integrationtest.utils.internalLog +import org.eclipse.hara.ddiclient.integrationtest.utils.logCurrentFunctionName +import org.testng.annotations.BeforeClass +import org.testng.annotations.Test +import kotlin.coroutines.cancellation.CancellationException + +class HaraClientStoppingTest : AbstractTest() { + + companion object { + const val TARGET_ID = "HaraClientStoppingTest" + } + + private val fiveSecondsDelayDownloadBehavior = object : DownloadBehavior { + override fun onAttempt(attempt: Int, artifactId: String, + previousError: Throwable?): DownloadBehavior.Try { + return DownloadBehavior.Try.After(5) + } + } + + private fun createMessageListener(channel: Channel): MessageListener { + return object : MessageListener { + override fun onMessage(message: MessageListener.Message) { + runBlocking { + "Received message: $message".internalLog() + channel.send(ExpectedMessage.HaraMessage(message)) + } + } + } + } + + @BeforeClass + override fun beforeTest() { + super.beforeTest() + setPollingTime("00:00:05") + } + + @Test(enabled = true, priority = 1, timeOut = 30_000, invocationCount = 1) + fun haraClientShouldStopPollingAfterBeingStoppedInDownloadingState() { + logCurrentFunctionName() + runBlocking { + + haraClientStopTestTemplate( + downloadBehavior = fiveSecondsDelayDownloadBehavior + ) { msg, testJob, testClient -> + if (msg is MessageListener.Message.Event.StartDownloadFile) { + assert { + testClient.stop() + "Client stopped".internalLog() + delay(5_000) // wait for 5 seconds to ensure that download is stopped + testJob?.cancel() + } + } + } + } + } + + @Test(enabled = true, priority = 2, timeOut = 45_000, invocationCount = 1) + fun haraClientShouldStopPollingAfterBeingStoppedInDownloadingStateForMultipleArtifacts() { + logCurrentFunctionName() + runBlocking { + + haraClientStopTestTemplate( + downloadBehavior = fiveSecondsDelayDownloadBehavior, + distributionId = TestUtils.OS_WITH_APPS_DISTRIBUTION_ID) { msg, testJob, testClient -> + if (msg is MessageListener.Message.Event.StartDownloadFile) { + assert { + testClient.stop() + "Client stopped".internalLog() + delay(30_000) + testJob?.cancel() + } + } + } + } + } + + @Test(enabled = true, priority = 3, timeOut = 30_000, invocationCount = 1) + fun haraClientShouldStopPollingAfterBeingStoppedInUpdatingState() { + logCurrentFunctionName() + runBlocking { + + val updater = object : Updater { + override fun apply(modules: Set, + messenger: Updater.Messenger): Updater.UpdateResult { + runBlocking { + println( + "Applying long running fake update (10 sec) for modules: $modules") + delay(8_000) + } + messenger.sendMessageToServer("Update applied") + return Updater.UpdateResult(true) + } + } + + haraClientStopTestTemplate(updater = updater) { msg, testJob, testClient -> + if (msg is MessageListener.Message.State.Updating) { + assert { + testClient.stop() + "Client stopped".internalLog() + delay(10_000) // wait for update to finish + testJob?.cancel() + } + } + } + } + } + + @Test(enabled = true, priority = 4, timeOut = 30_000, invocationCount = 1) + fun haraClientShouldPollAfterRestartedInDownloadingState() { + logCurrentFunctionName() + runBlocking { + + setPollingTime("00:00:10") + reCreateTestTargetOnServer(TARGET_ID) + assignHeavyOTAUpdateToTheTarget(TestUtils.OS_DISTRIBUTION_ID) + + val expectedMessage = mutableListOf() + val expectedMessageChannel = Channel(5, BufferOverflow.DROP_OLDEST) + + var testClient = createClient( + expectedMessageChannel, downloadBehavior = fiveSecondsDelayDownloadBehavior) + + testClient.startAsync() + + listenToMessages(expectedMessageChannel, testClient) { msg, testJob, _ -> + assert { + if (expectedMessage.isNotEmpty()) { + assertEquals(msg, expectedMessage.removeFirst().message) + testJob?.cancel() + } else if (msg is MessageListener.Message.Event.StartDownloadFile) { + testClient.stop() + "Client stopped".internalLog() + + // The client should start polling after restart + testClient = createClient(expectedMessageChannel) + expectedMessage.add( + ExpectedMessage.HaraMessage(MessageListener.Message.Event.Polling)) + testClient.startAsync() + } + } + } + } + } + + private suspend fun haraClientStopTestTemplate( + downloadBehavior: DownloadBehavior = TestUtils.downloadBehavior, + updater: Updater = TestUtils.updater, + distributionId: Int = TestUtils.OS_DISTRIBUTION_ID, + onMessageReceive: suspend (MessageListener.Message, Deferred?, HaraClient) -> Unit + ) { + + reCreateTestTargetOnServer(TARGET_ID) + assignHeavyOTAUpdateToTheTarget(distributionId) + + val expectedMessageChannel = Channel(5, BufferOverflow.DROP_OLDEST) + + val client = createClient( + expectedMessageChannel, downloadBehavior = downloadBehavior, updater = updater) + + client.startAsync() + + listenToMessages(expectedMessageChannel, client, onMessageReceive) + } + + private suspend fun listenToMessages( + expectedMessageChannel: Channel, + client: HaraClient, + onMessageReceive: suspend (MessageListener.Message, Deferred?, HaraClient) -> Unit) { + var testJob: Deferred? = null + val testScope = CoroutineScope(Dispatchers.IO) + testJob = testScope.async { + for (msg in expectedMessageChannel) { + if (msg is ExpectedMessage.HaraMessage) { + onMessageReceive(msg.message, testJob, client) + } + } + } + + try { + testJob.await() + } catch (ignored: CancellationException) { + } + } + + private fun createClient( + expectedMessageChannel: Channel, + downloadBehavior: DownloadBehavior = TestUtils.downloadBehavior, + updater: Updater = TestUtils.updater): HaraClient { + val messageListener = createMessageListener(expectedMessageChannel) + return clientFromTargetId( + downloadBehavior = downloadBehavior, updater = updater, + messageListeners = listOf(messageListener)).invoke(TARGET_ID) + } + + private suspend fun assignHeavyOTAUpdateToTheTarget( + distributionId: Int) { + val distribution = HawkbitAssignDistributionBody( + distributionId, AssignDistributionType.FORCED, 0) + assignDistributionToTheTarget(TARGET_ID, distribution) + } + +} diff --git a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt index cfe68d6..1a27761 100644 --- a/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt +++ b/src/test/kotlin/org/eclipse/hara/ddiclient/integrationtest/abstractions/AbstractTest.kt @@ -13,7 +13,6 @@ package org.eclipse.hara.ddiclient.integrationtest.abstractions import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking @@ -44,10 +43,6 @@ abstract class AbstractTest { protected lateinit var managementApi: ManagementApi - private val throwableScope = CoroutineScope(Dispatchers.Default) - - private var throwableJob: Deferred? = null - protected var client: HaraClient? = null set(value) { safeStopClient() @@ -104,12 +99,13 @@ abstract class AbstractTest { } } - protected suspend fun assert(assertionBlock: () -> Unit) { - throwableJob = throwableScope.async { + protected suspend fun assert(assertionBlock: suspend () -> Unit) { + val throwableScope = CoroutineScope(Dispatchers.Default) + val throwableJob = throwableScope.async { assertionBlock() } try { - throwableJob?.await() + throwableJob.await() } catch (ignored: CancellationException) { } }