Skip to content

Commit

Permalink
Merge pull request #28 from Kynetics/improve_force_ping
Browse files Browse the repository at this point in the history
Change the implementation of Force Ping
  • Loading branch information
andrea-zoleo authored Nov 7, 2023
2 parents cd041b6 + 9fbe519 commit 9d506b4
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ interface DdiClient {

suspend fun getControllerActions(): ControllerBaseResponse

suspend fun onControllerActionsChange(etag: String = "", onChange: OnResourceChange<ControllerBaseResponse>)
suspend fun onControllerActionsChange(etag: String = "", onChange: OnResourceChange<ControllerBaseResponse>, onNothingChange: suspend () -> Unit = {})

suspend fun getDeploymentActionDetails(actionId: String, historyCount: Int = -1): DeploymentBaseResponse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class DdiClientDefaultImpl private constructor(private val ddiRestApi: DdiRestAp
return handleResponse(response)
}

override suspend fun onControllerActionsChange(etag: String, onChange: OnResourceChange<ControllerBaseResponse>) {
override suspend fun onControllerActionsChange(etag: String,
onChange: OnResourceChange<ControllerBaseResponse>,
onNothingChange: suspend () -> Unit) {
LOG.debug("onDeploymentActionDetailsChange({})", etag)
val response = ddiRestApi.getControllerActions(tenant, controllerId, etag)
LOG.debug("{}", response)
handleOnChangeResponse(response, etag, "BaseResource", onChange)
handleOnChangeResponse(response, etag, "BaseResource", onChange, onNothingChange)
}

override suspend fun getDeploymentActionDetails(actionId: String, historyCount: Int): DeploymentBaseResponse {
Expand Down Expand Up @@ -98,15 +100,20 @@ class DdiClientDefaultImpl private constructor(private val ddiRestApi: DdiRestAp
return ddiRestApi.downloadArtifact(url).byteStream()
}

private suspend fun <T> handleOnChangeResponse(response: Response<T>, etag: String, resourceName: String, onChange: OnResourceChange<T>) {
private suspend fun <T> handleOnChangeResponse(response: Response<T>, etag: String,
resourceName: String, onChange: OnResourceChange<T>,
onNothingChange: suspend () -> Unit = {}) {
when (response.code()) {
in 200..299 -> {
val newEtag = response.headers()[ETAG_HEADER] ?: ""
LOG.info("{} is changed. Old ETag: {}, new ETag: {}", resourceName, etag, newEtag)
onChange.invoke(response.body()!!, newEtag)
}

HttpURLConnection.HTTP_NOT_MODIFIED -> LOG.info("{} not changed", resourceName)
HttpURLConnection.HTTP_NOT_MODIFIED -> {
LOG.info("{} not changed", resourceName)
onNothingChange.invoke()
}

else -> throw HttpException(response)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ interface MessageListener {

object NoUpdate : Event("No update to apply")

object NoNewState : Event("Server state is not changed")

/**
* An error occurred during the update.
* @property details, contains additional info about the error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
is In.SetPing -> become(runningReceive(startPing(state.copy(clientPingInterval = msg.duration, lastPing = Instant.EPOCH))))

is In.ForcePing -> {
become(runningReceive(state.clearEtags()))
channel.send(In.SetPing(null))
onPing(state.copy(clientPingInterval = null,
requireUpdateNotification = true))
}

is In.Ping -> onPing(state)
Expand Down Expand Up @@ -194,9 +194,16 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
this.send(Out.ConfigDataRequired, state)
}

client.onControllerActionsChange(state.controllerBaseEtag) { res, newEtag ->
onControllerBaseChange(state, s, res, newEtag)
}
client.onControllerActionsChange(state.controllerBaseEtag,
onChange = { res, newEtag ->
onControllerBaseChange(state, s, res, newEtag)
},
onNothingChange = {
if (state.requireUpdateNotification) {
notificationManager.send(MessageListener.Message.Event.NoNewState)
}
})

}.onFailure { t ->
fun loopMsg(t: Throwable): String = t.message + if (t.cause != null) " ${loopMsg(t.cause!!)}" else ""
val errorDetails = "exception: ${t.javaClass} message: ${loopMsg(t)}"
Expand Down Expand Up @@ -256,7 +263,8 @@ private constructor(scope: ActorScope) : AbstractActor(scope) {
val deploymentEtag: String = "",
val controllerBaseEtag: String = "",
val timer: Timer? = null,
val receivers: Set<ActorRef> = emptySet()
val receivers: Set<ActorRef> = emptySet(),
val requireUpdateNotification: Boolean = false
) {
val pingInterval = when {
backoffPingInterval != null -> backoffPingInterval
Expand Down

0 comments on commit 9d506b4

Please sign in to comment.