Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSN-13] feat(nf-tower): Retrieve and set Fusion license tokens in a process environment #5614

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugins/nf-tower/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ dependencies {
testImplementation(testFixtures(project(":nextflow")))
testImplementation "org.apache.groovy:groovy:4.0.24"
testImplementation "org.apache.groovy:groovy-nio:4.0.24"
// wiremock required by TowerFusionEnvTest
testImplementation "org.wiremock:wiremock:3.5.4"
}
266 changes: 266 additions & 0 deletions plugins/nf-tower/src/main/io/seqera/tower/plugin/TowerFusionEnv.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package io.seqera.tower.plugin

import com.google.gson.Gson
import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.seqera.tower.plugin.exception.BadResponseException
import io.seqera.tower.plugin.exception.UnauthorizedException
import io.seqera.tower.plugin.exchange.LicenseTokenRequest
import io.seqera.tower.plugin.exchange.LicenseTokenResponse
import nextflow.Global
import nextflow.Session
import nextflow.SysEnv
import nextflow.exception.AbortOperationException
import nextflow.fusion.FusionConfig
import nextflow.fusion.FusionEnv
import nextflow.util.Threads
import org.pf4j.Extension

import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.Executors
import java.util.function.Predicate

/**
* Environment provider for Platform-specific environment variables.
*
* @author Alberto Miranda <[email protected]>
*/
@Slf4j
@Extension
@CompileStatic
class TowerFusionEnv implements FusionEnv {

// The endpoint where license-scoped JWT tokens are obtained
private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/'

// Server errors that should trigger a retry
private static final List<Integer> SERVER_ERRORS = [429, 500, 502, 503, 504]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how I feel about retrying 503 and 504 errors, if Seqera Platform is under heavy load wouldn't this make it worse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took this from Wave's client to follow the existing approach. I don't have a specific preference 🤷‍♂️ (cc @pditommaso)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why it's used an exponential backoff. Actually think it should be added 408 (connection timeout) to that list

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, consider this resolved 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error 408 added with 6ce5be3


// Default connection timeout for HTTP requests
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS)

// Default retry policy settings for HTTP requests: delay, max delay, attempts, and jitter
private static final Duration DEFAULT_RETRY_POLICY_DELAY = Duration.of(450, ChronoUnit.MILLIS)
private static final Duration DEFAULT_RETRY_POLICY_MAX_DELAY = Duration.of(90, ChronoUnit.SECONDS)
private static final int DEFAULT_RETRY_POLICY_MAX_ATTEMPTS = 10
private static final double DEFAULT_RETRY_POLICY_JITTER = 0.5

// The HttpClient instance used to send requests
private final HttpClient httpClient = newDefaultHttpClient()

// The RetryPolicy instance used to retry requests
private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS)

// Nextflow session
private final Session session

// Platform endpoint to use for requests
private final String endpoint

// Platform access token to use for requests
private final String accessToken

/**
* Constructor for the class. It initializes the session, endpoint, and access token.
*/
TowerFusionEnv() {
this.session = Global.session as Session
final towerConfig = session.config.navigate('tower') as Map ?: [:]
final env = SysEnv.get()
this.endpoint = endpoint0(towerConfig, env)
this.accessToken = accessToken0(towerConfig, env)
}

/**
* Return any environment variables relevant to Fusion execution. This method is called
* by {@link nextflow.fusion.FusionEnvProvider#getEnvironment} to determine which
* environment variables are needed for the current run.
*
* @param scheme The scheme for which the environment variables are needed (currently unused)
* @param config The Fusion configuration object
* @return A map of environment variables
*/
@Override
Map<String, String> getEnvironment(String scheme, FusionConfig config) {

// TODO(amiranda): Hardcoded for now. We need to find out how to obtain
// the concrete product SKU and version. Candidate: FusionConfig?
final product = 'fusion'
final version = '2.4'
jordeu marked this conversation as resolved.
Show resolved Hide resolved

try {
final token = getLicenseToken(product, version)
return [
'FUSION_LICENSE_TOKEN': token,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'FUSION_LICENSE_TOKEN': token,
FUSION_LICENSE_TOKEN: token,

Groovy magic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed with f32f0ca

]
} catch (Exception e) {
log.warn("Error retrieving Fusion license information: ${e.message}")
return [:]
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Send a request to Platform to obtain a license-scoped JWT for Fusion. The request is authenticated using the
* Platform access token provided in the configuration of the current session.
*
* @throws AbortOperationException if a Platform access token cannot be found

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also throws other exceptions if the token is not valid or malformed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed with 1fed859 and refactored into a9849f5

*
* @return The signed JWT token
*/
protected String getLicenseToken(product, version) {
// FIXME(amiranda): Find out how to obtain the product and version
// Candidate: FusionConfig?

if (accessToken == null) {
throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment")
alberto-miranda marked this conversation as resolved.
Show resolved Hide resolved
}

final req = HttpRequest.newBuilder()
.uri(URI.create("${endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize())
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
.header('Content-Type', 'application/json')
.header('Authorization', "Bearer ${accessToken}")
.POST(
HttpRequest.BodyPublishers.ofString(
new Gson().toJson(
new LicenseTokenRequest(
product: product,
version: version
),
LicenseTokenRequest.class
),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Built-in JsonOutout.toJson(Map) can be easier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way for symmetry with the deserialization but I don't have a preference for one or the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine keeping gson, then it would be better to isolate into its own method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with 2f11171

)
)
.build()

try {
final resp = safeHttpSend(req, retryPolicy)

if (resp.statusCode() == 200) {
return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken
}

if (resp.statusCode() == 401) {
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token")
}

throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}")

} catch (IOException e) {
throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}")
}
}

/**************************************************************************
* Helper methods
*************************************************************************/

/**
* Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the
* environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint.
*
* @param opts the configuration options for Platform
* @param env the applicable environment variables
* @return the Platform API endpoint
*/
protected static String endpoint0(Map opts, Map<String, String> env) {
def result = opts.endpoint as String
if (!result || result == '-') {
result = env.get('TOWER_API_ENDPOINT') ?: TowerClient.DEF_ENDPOINT_URL
}
return result.stripEnd('/')
}

/**
* Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running
* in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check
* the configuration file or fallback to the environment. If no token is found, returns null.
*
* @param opts the configuration options for Platform
* @param env the applicable environment variables
* @return the Platform access token
*/
protected static String accessToken0(Map opts, Map<String, String> env) {
def token = env.get('TOWER_WORKFLOW_ID')
? env.get('TOWER_ACCESS_TOKEN')
: opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN')
return token
}
Copy link
Contributor Author

@alberto-miranda alberto-miranda Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically replicates what TowerConfig does in nf-wave, but I didn't want to add a explicit dependency on that. If adding this dependency is not a problem, it would be better to reuse the code there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make sense to turn those into PlatformHelper class shared across modules & plugins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Do you want it done in the context of this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored with c1147cd. I created the PlatformHelper class under modules/nextflow/src/main/groovy/nextflow/platform since it seemed to fit with the existing conventions for common code. Please double check.


/**
* Create a new HttpClient instance with default settings
* @return The new HttpClient instance
*/
private static HttpClient newDefaultHttpClient() {
final builder = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.followRedirects(HttpClient.Redirect.NEVER)
.cookieHandler(new CookieManager())
.connectTimeout(DEFAULT_CONNECTION_TIMEOUT)
// use virtual threads executor if enabled
if ( Threads.useVirtual() ) {
builder.executor(Executors.newVirtualThreadPerTaskExecutor())
}
// build and return the new client
return builder.build()
}

/**
* Create a new RetryPolicy instance with default settings and the given list of retryable errors. With this policy,
* a request is retried on IOExceptions and any server errors defined in errorsToRetry. The number of retries, delay,
* max delay, and jitter are controlled by the corresponding values defined at class level.
*
* @return The new RetryPolicy instance
*/
private static <T> RetryPolicy<HttpResponse<T>> newDefaultRetryPolicy(List<Integer> errorsToRetry) {

final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable>
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in errorsToRetry) as Predicate<HttpResponse<T>>

final listener = new EventListener<ExecutionAttemptedEvent<HttpResponse<T>>>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
def msg = "connection failure - attempt: ${event.attemptCount}"
if (event.lastResult != null)
msg += "; response: ${event.lastResult}"
if (event.lastFailure != null)
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}"
log.debug(msg)
}
}
return RetryPolicy.<HttpResponse<T>> builder()
.handleIf(retryOnException)
.handleResultIf(retryOnStatusCode)
.withBackoff(DEFAULT_RETRY_POLICY_DELAY.toMillis(), DEFAULT_RETRY_POLICY_MAX_DELAY.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(DEFAULT_RETRY_POLICY_MAX_ATTEMPTS)
.withJitter(DEFAULT_RETRY_POLICY_JITTER)
.onRetry(listener)
.build()
}

/**
* Send an HTTP request and return the response. This method automatically retries the request according to the
* given RetryPolicy.
*
* @param req The HttpRequest to send
* @return The HttpResponse received
*/
private <T> HttpResponse<String> safeHttpSend(HttpRequest req, RetryPolicy<T> policy) {
return Failsafe.with(policy).get(
() -> {
log.debug "Request: method:=${req.method()}; uri:=${req.uri()}; request:=${req}"
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
log.debug "Response: statusCode:=${resp.statusCode()}; body:=${resp.body()}"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This response is going to contain an authentication token right? Do we want to log it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I'm not sure about what the preferred policy is in Nextflow logs (cc @pditommaso)

return resp
} as CheckedSupplier
) as HttpResponse<String>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.seqera.tower.plugin.exception

import groovy.transform.InheritConstructors

@InheritConstructors
class BadResponseException extends RuntimeException{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.seqera.tower.plugin.exception

import groovy.transform.InheritConstructors

@InheritConstructors
class UnauthorizedException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.seqera.tower.plugin.exchange

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString

/**
* Models a REST request to obtain a license-scoped JWT token from Platform
*
* @author Alberto Miranda <[email protected]>
*/
@EqualsAndHashCode
@ToString(includeNames = true, includePackage = false)
@CompileStatic
class LicenseTokenRequest {

/** The product code */
String product

/** The product version */
String version
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.seqera.tower.plugin.exchange

import groovy.transform.CompileStatic
import groovy.transform.ToString

/**
* Models a REST response containing a license-scoped JWT token from Platform
*
* @author Alberto Miranda <[email protected]>
*/
@CompileStatic
@ToString(includeNames = true, includePackage = false)
class LicenseTokenResponse {
/**
* The signed JWT token
*/
String signedToken

/**
* The expiration date of the token
*/
Date expirationDate
}
1 change: 1 addition & 0 deletions plugins/nf-tower/src/resources/META-INF/extensions.idx
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
#

io.seqera.tower.plugin.TowerFactory
io.seqera.tower.plugin.TowerFusionEnv
Loading
Loading