Skip to content

Commit

Permalink
WebSocketClient dynamic host/port (#164)
Browse files Browse the repository at this point in the history
* WebSocketClient dynamic host/port

 - make dependants on webSocketclient use a provider
 - provider listens to settings repository changes and updates instance cached
 - implementation for test connection on trusted node setup
* - proper implementation of test connection + adjustements on updating settings
* - added TODOs for short term changes coming
  • Loading branch information
rodvar authored Jan 16, 2025
1 parent 640e676 commit 42894c4
Show file tree
Hide file tree
Showing 17 changed files with 193 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import network.bisq.mobile.client.service.trades.TradesApiGateway
import network.bisq.mobile.client.service.user_profile.ClientUserProfileServiceFacade
import network.bisq.mobile.client.service.user_profile.UserProfileApiGateway
import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.api_proxy.WebSocketApiClient
import network.bisq.mobile.client.websocket.messages.SubscriptionRequest
import network.bisq.mobile.client.websocket.messages.SubscriptionResponse
Expand Down Expand Up @@ -60,6 +61,7 @@ import network.bisq.mobile.domain.service.offers.OffersServiceFacade
import network.bisq.mobile.domain.service.settings.SettingsServiceFacade
import network.bisq.mobile.domain.service.trades.TradesServiceFacade
import network.bisq.mobile.domain.service.user_profile.UserProfileServiceFacade
import org.koin.core.parameter.parametersOf
import org.koin.core.qualifier.named
import org.koin.dsl.module

Expand Down Expand Up @@ -132,12 +134,21 @@ val clientModule = module {
single(named("WebsocketApiHost")) { get<EnvironmentController>().getWebSocketHost() }
single(named("WebsocketApiPort")) { get<EnvironmentController>().getWebSocketPort() }

single {
factory { (host: String, port: Int) ->
WebSocketClient(
get(),
get(),
host,
port
)
}

single {
WebSocketClientProvider(
get(named("WebsocketApiHost")),
get(named("WebsocketApiPort"))
get(named("WebsocketApiPort")),
get(),
clientFactory = { host, port -> get { parametersOf(host, port) } },
)
}

Expand Down Expand Up @@ -171,7 +182,7 @@ val clientModule = module {
single { ExplorerApiGateway(get()) }
single<ExplorerServiceFacade> { ClientExplorerServiceFacade(get()) }

single { MediationApiGateway(get(), get()) }
single { MediationApiGateway(get()) }
single<MediationServiceFacade> { ClientMediationServiceFacade(get()) }

single { SettingsApiGateway(get()) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package network.bisq.mobile.client.service.market

import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.api_proxy.WebSocketApiClient
import network.bisq.mobile.client.websocket.subscription.Topic
import network.bisq.mobile.client.websocket.subscription.WebSocketEventObserver
import network.bisq.mobile.domain.utils.Logging

class MarketPriceApiGateway(
private val webSocketApiClient: WebSocketApiClient,
private val webSocketClient: WebSocketClient,
private val webSocketClientProvider: WebSocketClientProvider,
) : Logging {
private val basePath = "market-price"

Expand All @@ -17,6 +18,6 @@ class MarketPriceApiGateway(
}

suspend fun subscribeMarketPrice(): WebSocketEventObserver {
return webSocketClient.subscribe(Topic.MARKET_PRICE)
return webSocketClientProvider.get().subscribe(Topic.MARKET_PRICE)
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package network.bisq.mobile.client.service.mediation

import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.api_proxy.WebSocketApiClient
import network.bisq.mobile.domain.utils.Logging

class MediationApiGateway(
private val webSocketApiClient: WebSocketApiClient,
private val webSocketClient: WebSocketClient,
private val webSocketApiClient: WebSocketApiClient
) : Logging {
private val basePath = "mediation"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package network.bisq.mobile.client.service.offers

import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.api_proxy.WebSocketApiClient
import network.bisq.mobile.client.websocket.subscription.Topic
import network.bisq.mobile.client.websocket.subscription.WebSocketEventObserver
Expand All @@ -13,7 +13,7 @@ import network.bisq.mobile.domain.utils.Logging

class OfferbookApiGateway(
private val webSocketApiClient: WebSocketApiClient,
private val webSocketClient: WebSocketClient,
private val webSocketClientProvider: WebSocketClientProvider,
) : Logging {
private val basePath = "offerbook"

Expand Down Expand Up @@ -58,15 +58,15 @@ class OfferbookApiGateway(

// Subscriptions
suspend fun subscribeNumOffers(): WebSocketEventObserver {
return webSocketClient.subscribe(Topic.NUM_OFFERS)
return webSocketClientProvider.get().subscribe(Topic.NUM_OFFERS)
}

/**
* @param code The quote currency code for which we want to receive updates.
* If null or empty string we receive for all markets the offer updates.
*/
suspend fun subscribeOffers(code: String? = null): WebSocketEventObserver {
return webSocketClient.subscribe(Topic.OFFERS, code)
return webSocketClientProvider.get().subscribe(Topic.OFFERS, code)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.serialization.json.Json
import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.subscription.ModificationType
import network.bisq.mobile.client.websocket.subscription.Subscription
import network.bisq.mobile.client.websocket.subscription.Topic
Expand All @@ -19,7 +19,7 @@ import network.bisq.mobile.domain.utils.Logging

class ClientTradesServiceFacade(
private val apiGateway: TradesApiGateway,
webSocketClient: WebSocketClient,
webSocketClientProvider: WebSocketClientProvider,
json: Json
) :
TradesServiceFacade, Logging {
Expand All @@ -38,10 +38,10 @@ class ClientTradesServiceFacade(
private val tradeId get() = selectedTrade.value?.tradeId
private val coroutineScope = CoroutineScope(BackgroundDispatcher)
private val openTradesSubscription: Subscription<TradeItemPresentationDto> =
Subscription(webSocketClient, json, Topic.TRADES, this::handleTradeItemPresentationChange)
Subscription(webSocketClientProvider, json, Topic.TRADES, this::handleTradeItemPresentationChange)

private val tradePropertiesSubscription: Subscription<Map<String, TradePropertiesDto>> =
Subscription(webSocketClient, json, Topic.TRADE_PROPERTIES, this::handleTradePropertiesChange)
Subscription(webSocketClientProvider, json, Topic.TRADE_PROPERTIES, this::handleTradePropertiesChange)

//private var openTradesSubscriptionJob: Job? = null
// private var tradePropertiesSubscriptionJob: Job? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import network.bisq.mobile.client.service.trades.TradeEventTypeEnum.REJECT_TRADE
import network.bisq.mobile.client.service.trades.TradeEventTypeEnum.SELLER_CONFIRM_BTC_SENT
import network.bisq.mobile.client.service.trades.TradeEventTypeEnum.SELLER_CONFIRM_FIAT_RECEIPT
import network.bisq.mobile.client.service.trades.TradeEventTypeEnum.SELLER_SENDS_PAYMENT_ACCOUNT
import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.api_proxy.WebSocketApiClient
import network.bisq.mobile.domain.utils.Logging

class TradesApiGateway(
private val webSocketApiClient: WebSocketApiClient,
private val webSocketClient: WebSocketClient,
private val webSocketClientProvider: WebSocketClientProvider,
) : Logging {
private val basePath = "trades"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.ktor.websocket.close
import io.ktor.websocket.readText
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand All @@ -31,8 +32,8 @@ import network.bisq.mobile.domain.utils.createUuid
class WebSocketClient(
private val httpClient: HttpClient,
val json: Json,
host: String,
port: Int
var host: String,
var port: Int
) : Logging {

private val webSocketUrl: String = "ws://$host:$port/websocket"
Expand All @@ -48,9 +49,11 @@ class WebSocketClient(
if (!isConnected) {
try {
session = httpClient.webSocketSession { url(webSocketUrl) }
isConnected = true
CoroutineScope(BackgroundDispatcher).launch { startListening() }
connectionReady.complete(true)
if (session != null && session!!.isActive) {
isConnected = true
CoroutineScope(BackgroundDispatcher).launch { startListening() }
connectionReady.complete(true)
}
} catch (e: Exception) {
log.e("Connecting websocket failed", e)
throw e
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package network.bisq.mobile.client.websocket

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import network.bisq.mobile.client.websocket.messages.WebSocketRestApiRequest
import network.bisq.mobile.domain.data.BackgroundDispatcher
import network.bisq.mobile.domain.data.repository.SettingsRepository
import network.bisq.mobile.domain.utils.Logging
import kotlin.concurrent.Volatile
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid

/**
* Provider to handle dynamic host/port changes
*/
class WebSocketClientProvider(
defaultHost: String,
defaultPort: Int,
private val settingsRepository: SettingsRepository,
private val clientFactory: (String, Int) -> WebSocketClient) : Logging {

companion object {
fun parseUri(uri: String): Pair<String,Int> {
uri.split("//")[1].split(":").let {
return Pair(it[0], it[1].toInt())
}
}
}

private val backgroundScope = CoroutineScope(BackgroundDispatcher)

@Volatile
private var currentClient: WebSocketClient? = null

init {
// Listen to changes in WebSocket configuration and update the client
// TODO we might need to replicate this for changes in settings to reconnect to channels
backgroundScope.launch {
try {
settingsRepository.data.collect { newSettings ->
var host = defaultHost
var port = defaultPort
newSettings?.bisqApiUrl?.takeIf { it.isNotBlank() }?.let { url ->
log.d { "new bisq url $url "}
parseUri(url).apply {
host = first
port = second
}
}
// only update if there was actually a change
if (currentClient == null || currentClient!!.host != host || currentClient!!.port != port) {
if (currentClient?.isConnected == true) {
currentClient?.disconnect()
}
log.d { "Websocket client updated with url $host:$port" }
currentClient = createClient(host, port)
}
}
} catch (e: Exception) {
log.e(e) { "Error updating WebSocket client with new settings." }
}
}
}

@OptIn(ExperimentalUuidApi::class)
suspend fun testClient(host: String, port: Int): Boolean {
val client = createClient(host, port)
val url = "ws://$host:$port"
return try {
// if connection is refused, catch will execute returning false
client.connect()
return client.isConnected
} catch (e: Exception) {
log.e("Error testing connection to $url: ${e.message}")
false
} finally {
client.disconnect() // Ensure the client is closed to free resources
}
}

private fun createClient(host: String, port: Int): WebSocketClient {
return clientFactory(host, port)
}

fun get(): WebSocketClient {
if (currentClient == null) {
runBlocking {
settingsRepository.fetch()
}
}
return currentClient!!
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.ktor.http.HttpStatusCode
import io.ktor.http.contentType
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.client.websocket.messages.WebSocketRestApiRequest
import network.bisq.mobile.client.websocket.messages.WebSocketRestApiResponse
import network.bisq.mobile.domain.utils.Logging
Expand All @@ -21,7 +21,7 @@ import kotlin.uuid.Uuid

class WebSocketApiClient(
val httpClient: HttpClient,
val webSocketClient: WebSocketClient,
val webSocketClientProvider: WebSocketClientProvider,
val json: Json,
host: String,
port: Int
Expand Down Expand Up @@ -108,7 +108,7 @@ class WebSocketApiClient(
bodyAsJson
)
try {
val response = webSocketClient.sendRequestAndAwaitResponse(webSocketRestApiRequest)
val response = webSocketClientProvider.get().sendRequestAndAwaitResponse(webSocketRestApiRequest)
require(response is WebSocketRestApiResponse) { "Response not of expected type. response=$response" }
val body = response.body
if (response.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import network.bisq.mobile.client.websocket.WebSocketClient
import network.bisq.mobile.client.websocket.WebSocketClientProvider
import network.bisq.mobile.domain.data.BackgroundDispatcher
import network.bisq.mobile.domain.utils.Logging

class Subscription<T>(
private val webSocketClient: WebSocketClient,
private val webSocketClientProvider: WebSocketClientProvider,
private val json: Json,
private val topic: Topic,
private val resultHandler: (List<T>, ModificationType) -> Unit
Expand All @@ -25,7 +25,7 @@ class Subscription<T>(
require(job == null)
job = coroutineScope.launch {
// subscribe blocks until we get a response
val observer = webSocketClient.subscribe(topic)
val observer = webSocketClientProvider.get().subscribe(topic)
observer.webSocketEvent.collect { webSocketEvent ->
try {
if (webSocketEvent?.deferredPayload == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ sealed class BaseModel {
return false
if (other is BaseModel) {
if (other.id != UNDEFINED_ID && id != UNDEFINED_ID)
return other.id.equals(id)
return other.id == id
return super.equals(other)
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import kotlinx.serialization.Serializable
@Serializable
open class Settings : BaseModel() {
open var bisqApiUrl: String = ""
open var isConnected: Boolean = false

//todo better rely on the source data alone (has user profile, has bisqApiUrl set) as otherwise we can get out of sync
open var firstLaunch: Boolean = true
}
Loading

0 comments on commit 42894c4

Please sign in to comment.