-
Notifications
You must be signed in to change notification settings - Fork 649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FSN-13] feat(nf-tower): Retrieve and set Fusion license tokens in a process environment #5614
base: master
Are you sure you want to change the base?
Changes from 1 commit
e7b1082
c1147cd
20c8a53
f32f0ca
25cd5ae
2f11171
94f6075
1fed859
6ce5be3
a9849f5
eaa985e
43f8bba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,266 @@ | ||||||
package io.seqera.tower.plugin | ||||||
|
||||||
import com.google.gson.Gson | ||||||
import dev.failsafe.Failsafe | ||||||
import dev.failsafe.RetryPolicy | ||||||
import dev.failsafe.event.EventListener | ||||||
import dev.failsafe.event.ExecutionAttemptedEvent | ||||||
import dev.failsafe.function.CheckedSupplier | ||||||
import groovy.transform.CompileStatic | ||||||
import groovy.util.logging.Slf4j | ||||||
import io.seqera.tower.plugin.exception.BadResponseException | ||||||
import io.seqera.tower.plugin.exception.UnauthorizedException | ||||||
import io.seqera.tower.plugin.exchange.LicenseTokenRequest | ||||||
import io.seqera.tower.plugin.exchange.LicenseTokenResponse | ||||||
import nextflow.Global | ||||||
import nextflow.Session | ||||||
import nextflow.SysEnv | ||||||
import nextflow.exception.AbortOperationException | ||||||
import nextflow.fusion.FusionConfig | ||||||
import nextflow.fusion.FusionEnv | ||||||
import nextflow.util.Threads | ||||||
import org.pf4j.Extension | ||||||
|
||||||
import java.net.http.HttpClient | ||||||
import java.net.http.HttpRequest | ||||||
import java.net.http.HttpResponse | ||||||
import java.time.Duration | ||||||
import java.time.temporal.ChronoUnit | ||||||
import java.util.concurrent.Executors | ||||||
import java.util.function.Predicate | ||||||
|
||||||
/** | ||||||
* Environment provider for Platform-specific environment variables. | ||||||
* | ||||||
* @author Alberto Miranda <[email protected]> | ||||||
*/ | ||||||
@Slf4j | ||||||
@Extension | ||||||
@CompileStatic | ||||||
class TowerFusionEnv implements FusionEnv { | ||||||
|
||||||
// The endpoint where license-scoped JWT tokens are obtained | ||||||
private static final String LICENSE_TOKEN_ENDPOINT = 'license/token/' | ||||||
|
||||||
// Server errors that should trigger a retry | ||||||
private static final List<Integer> SERVER_ERRORS = [429, 500, 502, 503, 504] | ||||||
|
||||||
// Default connection timeout for HTTP requests | ||||||
private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration.of(30, ChronoUnit.SECONDS) | ||||||
|
||||||
// Default retry policy settings for HTTP requests: delay, max delay, attempts, and jitter | ||||||
private static final Duration DEFAULT_RETRY_POLICY_DELAY = Duration.of(450, ChronoUnit.MILLIS) | ||||||
private static final Duration DEFAULT_RETRY_POLICY_MAX_DELAY = Duration.of(90, ChronoUnit.SECONDS) | ||||||
private static final int DEFAULT_RETRY_POLICY_MAX_ATTEMPTS = 10 | ||||||
private static final double DEFAULT_RETRY_POLICY_JITTER = 0.5 | ||||||
|
||||||
// The HttpClient instance used to send requests | ||||||
private final HttpClient httpClient = newDefaultHttpClient() | ||||||
|
||||||
// The RetryPolicy instance used to retry requests | ||||||
private final RetryPolicy retryPolicy = newDefaultRetryPolicy(SERVER_ERRORS) | ||||||
|
||||||
// Nextflow session | ||||||
private final Session session | ||||||
|
||||||
// Platform endpoint to use for requests | ||||||
private final String endpoint | ||||||
|
||||||
// Platform access token to use for requests | ||||||
private final String accessToken | ||||||
|
||||||
/** | ||||||
* Constructor for the class. It initializes the session, endpoint, and access token. | ||||||
*/ | ||||||
TowerFusionEnv() { | ||||||
this.session = Global.session as Session | ||||||
final towerConfig = session.config.navigate('tower') as Map ?: [:] | ||||||
final env = SysEnv.get() | ||||||
this.endpoint = endpoint0(towerConfig, env) | ||||||
this.accessToken = accessToken0(towerConfig, env) | ||||||
} | ||||||
|
||||||
/** | ||||||
* Return any environment variables relevant to Fusion execution. This method is called | ||||||
* by {@link nextflow.fusion.FusionEnvProvider#getEnvironment} to determine which | ||||||
* environment variables are needed for the current run. | ||||||
* | ||||||
* @param scheme The scheme for which the environment variables are needed (currently unused) | ||||||
* @param config The Fusion configuration object | ||||||
* @return A map of environment variables | ||||||
*/ | ||||||
@Override | ||||||
Map<String, String> getEnvironment(String scheme, FusionConfig config) { | ||||||
|
||||||
// TODO(amiranda): Hardcoded for now. We need to find out how to obtain | ||||||
// the concrete product SKU and version. Candidate: FusionConfig? | ||||||
final product = 'fusion' | ||||||
final version = '2.4' | ||||||
jordeu marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
try { | ||||||
final token = getLicenseToken(product, version) | ||||||
return [ | ||||||
'FUSION_LICENSE_TOKEN': token, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Groovy magic There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed with f32f0ca |
||||||
] | ||||||
} catch (Exception e) { | ||||||
log.warn("Error retrieving Fusion license information: ${e.message}") | ||||||
return [:] | ||||||
pditommaso marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Send a request to Platform to obtain a license-scoped JWT for Fusion. The request is authenticated using the | ||||||
* Platform access token provided in the configuration of the current session. | ||||||
* | ||||||
* @throws AbortOperationException if a Platform access token cannot be found | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It also throws other exceptions if the token is not valid or malformed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
* | ||||||
* @return The signed JWT token | ||||||
*/ | ||||||
protected String getLicenseToken(product, version) { | ||||||
// FIXME(amiranda): Find out how to obtain the product and version | ||||||
// Candidate: FusionConfig? | ||||||
|
||||||
if (accessToken == null) { | ||||||
throw new AbortOperationException("Missing personal access token -- Make sure there's a variable TOWER_ACCESS_TOKEN in your environment") | ||||||
alberto-miranda marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
} | ||||||
|
||||||
final req = HttpRequest.newBuilder() | ||||||
.uri(URI.create("${endpoint}/${LICENSE_TOKEN_ENDPOINT}").normalize()) | ||||||
pditommaso marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
.header('Content-Type', 'application/json') | ||||||
.header('Authorization', "Bearer ${accessToken}") | ||||||
.POST( | ||||||
HttpRequest.BodyPublishers.ofString( | ||||||
new Gson().toJson( | ||||||
new LicenseTokenRequest( | ||||||
product: product, | ||||||
version: version | ||||||
), | ||||||
LicenseTokenRequest.class | ||||||
), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Built-in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did it this way for symmetry with the deserialization but I don't have a preference for one or the other. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm fine keeping gson, then it would be better to isolate into its own method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done with 2f11171 |
||||||
) | ||||||
) | ||||||
.build() | ||||||
|
||||||
try { | ||||||
final resp = safeHttpSend(req, retryPolicy) | ||||||
|
||||||
if (resp.statusCode() == 200) { | ||||||
return new Gson().fromJson(resp.body(), LicenseTokenResponse.class).signedToken | ||||||
} | ||||||
|
||||||
if (resp.statusCode() == 401) { | ||||||
throw new UnauthorizedException("Unauthorized [401] - Verify you have provided a valid access token") | ||||||
} | ||||||
|
||||||
throw new BadResponseException("Invalid response: ${req.method()} ${req.uri()} [${resp.statusCode()}] ${resp.body()}") | ||||||
|
||||||
} catch (IOException e) { | ||||||
throw new IllegalStateException("Unable to send request to '${req.uri()}' : ${e.message}") | ||||||
} | ||||||
} | ||||||
|
||||||
/************************************************************************** | ||||||
* Helper methods | ||||||
*************************************************************************/ | ||||||
|
||||||
/** | ||||||
* Get the configured Platform API endpoint: if the endpoint is not provided in the configuration, we fallback to the | ||||||
* environment variable `TOWER_API_ENDPOINT`. If neither is provided, we fallback to the default endpoint. | ||||||
* | ||||||
* @param opts the configuration options for Platform | ||||||
* @param env the applicable environment variables | ||||||
* @return the Platform API endpoint | ||||||
*/ | ||||||
protected static String endpoint0(Map opts, Map<String, String> env) { | ||||||
def result = opts.endpoint as String | ||||||
if (!result || result == '-') { | ||||||
result = env.get('TOWER_API_ENDPOINT') ?: TowerClient.DEF_ENDPOINT_URL | ||||||
} | ||||||
return result.stripEnd('/') | ||||||
} | ||||||
|
||||||
/** | ||||||
* Get the configured Platform access token: if `TOWER_WORKFLOW_ID` is provided in the environment, we are running | ||||||
* in a Platform-made run and we should ONLY retrieve the token from the environment. Otherwise, check | ||||||
* the configuration file or fallback to the environment. If no token is found, returns null. | ||||||
* | ||||||
* @param opts the configuration options for Platform | ||||||
* @param env the applicable environment variables | ||||||
* @return the Platform access token | ||||||
*/ | ||||||
protected static String accessToken0(Map opts, Map<String, String> env) { | ||||||
def token = env.get('TOWER_WORKFLOW_ID') | ||||||
? env.get('TOWER_ACCESS_TOKEN') | ||||||
: opts.containsKey('accessToken') ? opts.accessToken as String : env.get('TOWER_ACCESS_TOKEN') | ||||||
return token | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This basically replicates what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could make sense to turn those into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Do you want it done in the context of this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it could be better There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactored with c1147cd. I created the |
||||||
|
||||||
/** | ||||||
* Create a new HttpClient instance with default settings | ||||||
* @return The new HttpClient instance | ||||||
*/ | ||||||
private static HttpClient newDefaultHttpClient() { | ||||||
final builder = HttpClient.newBuilder() | ||||||
.version(HttpClient.Version.HTTP_1_1) | ||||||
.followRedirects(HttpClient.Redirect.NEVER) | ||||||
.cookieHandler(new CookieManager()) | ||||||
.connectTimeout(DEFAULT_CONNECTION_TIMEOUT) | ||||||
// use virtual threads executor if enabled | ||||||
if ( Threads.useVirtual() ) { | ||||||
builder.executor(Executors.newVirtualThreadPerTaskExecutor()) | ||||||
} | ||||||
// build and return the new client | ||||||
return builder.build() | ||||||
} | ||||||
|
||||||
/** | ||||||
* Create a new RetryPolicy instance with default settings and the given list of retryable errors. With this policy, | ||||||
* a request is retried on IOExceptions and any server errors defined in errorsToRetry. The number of retries, delay, | ||||||
* max delay, and jitter are controlled by the corresponding values defined at class level. | ||||||
* | ||||||
* @return The new RetryPolicy instance | ||||||
*/ | ||||||
private static <T> RetryPolicy<HttpResponse<T>> newDefaultRetryPolicy(List<Integer> errorsToRetry) { | ||||||
|
||||||
final retryOnException = (e -> e instanceof IOException) as Predicate<? extends Throwable> | ||||||
final retryOnStatusCode = ((HttpResponse<T> resp) -> resp.statusCode() in errorsToRetry) as Predicate<HttpResponse<T>> | ||||||
|
||||||
final listener = new EventListener<ExecutionAttemptedEvent<HttpResponse<T>>>() { | ||||||
@Override | ||||||
void accept(ExecutionAttemptedEvent event) throws Throwable { | ||||||
def msg = "connection failure - attempt: ${event.attemptCount}" | ||||||
if (event.lastResult != null) | ||||||
msg += "; response: ${event.lastResult}" | ||||||
if (event.lastFailure != null) | ||||||
msg += "; exception: [${event.lastFailure.class.name}] ${event.lastFailure.message}" | ||||||
log.debug(msg) | ||||||
} | ||||||
} | ||||||
return RetryPolicy.<HttpResponse<T>> builder() | ||||||
.handleIf(retryOnException) | ||||||
.handleResultIf(retryOnStatusCode) | ||||||
.withBackoff(DEFAULT_RETRY_POLICY_DELAY.toMillis(), DEFAULT_RETRY_POLICY_MAX_DELAY.toMillis(), ChronoUnit.MILLIS) | ||||||
.withMaxAttempts(DEFAULT_RETRY_POLICY_MAX_ATTEMPTS) | ||||||
.withJitter(DEFAULT_RETRY_POLICY_JITTER) | ||||||
.onRetry(listener) | ||||||
.build() | ||||||
} | ||||||
|
||||||
/** | ||||||
* Send an HTTP request and return the response. This method automatically retries the request according to the | ||||||
* given RetryPolicy. | ||||||
* | ||||||
* @param req The HttpRequest to send | ||||||
* @return The HttpResponse received | ||||||
*/ | ||||||
private <T> HttpResponse<String> safeHttpSend(HttpRequest req, RetryPolicy<T> policy) { | ||||||
return Failsafe.with(policy).get( | ||||||
() -> { | ||||||
log.debug "Request: method:=${req.method()}; uri:=${req.uri()}; request:=${req}" | ||||||
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString()) | ||||||
log.debug "Response: statusCode:=${resp.statusCode()}; body:=${resp.body()}" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This response is going to contain an authentication token right? Do we want to log it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. I'm not sure about what the preferred policy is in Nextflow logs (cc @pditommaso) |
||||||
return resp | ||||||
} as CheckedSupplier | ||||||
) as HttpResponse<String> | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.seqera.tower.plugin.exception | ||
|
||
import groovy.transform.InheritConstructors | ||
|
||
@InheritConstructors | ||
class BadResponseException extends RuntimeException{ | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
package io.seqera.tower.plugin.exception | ||
|
||
import groovy.transform.InheritConstructors | ||
|
||
@InheritConstructors | ||
class UnauthorizedException extends RuntimeException { | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.seqera.tower.plugin.exchange | ||
|
||
import groovy.transform.CompileStatic | ||
import groovy.transform.EqualsAndHashCode | ||
import groovy.transform.ToString | ||
|
||
/** | ||
* Models a REST request to obtain a license-scoped JWT token from Platform | ||
* | ||
* @author Alberto Miranda <[email protected]> | ||
*/ | ||
@EqualsAndHashCode | ||
@ToString(includeNames = true, includePackage = false) | ||
@CompileStatic | ||
class LicenseTokenRequest { | ||
|
||
/** The product code */ | ||
String product | ||
|
||
/** The product version */ | ||
String version | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package io.seqera.tower.plugin.exchange | ||
|
||
import groovy.transform.CompileStatic | ||
import groovy.transform.ToString | ||
|
||
/** | ||
* Models a REST response containing a license-scoped JWT token from Platform | ||
* | ||
* @author Alberto Miranda <[email protected]> | ||
*/ | ||
@CompileStatic | ||
@ToString(includeNames = true, includePackage = false) | ||
class LicenseTokenResponse { | ||
/** | ||
* The signed JWT token | ||
*/ | ||
String signedToken | ||
|
||
/** | ||
* The expiration date of the token | ||
*/ | ||
Date expirationDate | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,3 +10,4 @@ | |
# | ||
|
||
io.seqera.tower.plugin.TowerFactory | ||
io.seqera.tower.plugin.TowerFusionEnv |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how I feel about retrying 503 and 504 errors, if Seqera Platform is under heavy load wouldn't this make it worse?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took this from Wave's client to follow the existing approach. I don't have a specific preference 🤷♂️ (cc @pditommaso)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's why it's used an exponential backoff. Actually think it should be added 408 (connection timeout) to that list
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, consider this resolved 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error 408 added with 6ce5be3