Skip to content

Commit

Permalink
Workflow output attach debug logging [BW-529] (#1376)
Browse files Browse the repository at this point in the history
  • Loading branch information
aednichols authored Mar 8, 2021
1 parent d737b49 commit 7b30964
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.pattern._
import com.google.api.client.auth.oauth2.Credential
import com.typesafe.scalalogging.LazyLogging
import nl.grons.metrics4.scala.Counter
import org.broadinstitute.dsde.rawls.{RawlsException, RawlsFatalExceptionWithErrorReport}
import org.broadinstitute.dsde.rawls.{RawlsException, RawlsFatalExceptionWithErrorReport, model}
import org.broadinstitute.dsde.rawls.coordination.DataSourceAccess
import org.broadinstitute.dsde.rawls.dataaccess._
import org.broadinstitute.dsde.rawls.dataaccess.slick._
Expand Down Expand Up @@ -445,27 +445,36 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum
logger.debug(s"attaching outputs for ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}: ${outputs}")
logger.debug(s"output expressions for ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}: ${outputExpressionMap}")

val parsedExpressions = outputExpressionMap.map { case (outputName, outputExprStr) =>
val parsedExpressions: Seq[Try[OutputExpression]] = outputExpressionMap.map { case (outputName, outputExprStr) =>
outputs.get(outputName) match {
case None => Failure(new RawlsException(s"output named ${outputName} does not exist"))
case Some(Right(uot: UnsupportedOutputType)) => Failure(new RawlsException(s"output named ${outputName} is not a supported type, received json u${uot.json.compactPrint}"))
case Some(Left(output)) =>
val entityTypeOpt = workflowRecord.workflowEntityId.flatMap(entitiesById.get).map(_.entityType)
OutputExpression.build(outputExprStr, output, entityTypeOpt)
}
}
}.toSeq

if (parsedExpressions.forall(_.isSuccess)) {
val boundExpressions = parsedExpressions.collect { case Success(boe @ BoundOutputExpression(target, name, attr)) => boe }
val boundExpressions: Seq[BoundOutputExpression] = parsedExpressions.collect { case Success(boe @ BoundOutputExpression(target, name, attr)) => boe }
val updates = updateEntityAndWorkspace(workflowRecord.workflowEntityId.map(id => Some(entitiesById(id))).getOrElse(None), workspace, boundExpressions)

val (optEntityUpdates, optWs) = updates
optEntityUpdates foreach { update: WorkflowEntityUpdate =>
logger.debug(s"Updating ${update.upserts.size} attributes for entity ${update.entityRef.entityName} of type ${update.entityRef.entityType} in ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}. First 100: ${update.upserts.take(100)}")
}
optWs foreach { workspace: Workspace =>
logger.debug(s"Updating ${workspace.attributes.size} workspace attributes in ${submissionId.toString}/${workflowRecord.externalId.getOrElse("MISSING_WORKFLOW")}. First 100: ${workspace.attributes.take(100)}")
}

Left(updates)
} else {
Right((workflowRecord, parsedExpressions.collect { case Failure(t) => AttributeString(t.getMessage) }.toSeq))
Right((workflowRecord, parsedExpressions.collect { case Failure(t) => AttributeString(t.getMessage) }))
}
}
}

def updateEntityAndWorkspace(entity: Option[Entity], workspace: Workspace, workflowOutputs: Iterable[BoundOutputExpression]): (Option[WorkflowEntityUpdate], Option[Workspace]) = {
private def updateEntityAndWorkspace(entity: Option[Entity], workspace: Workspace, workflowOutputs: Iterable[BoundOutputExpression]): (Option[WorkflowEntityUpdate], Option[Workspace]) = {
val entityUpsert = workflowOutputs.collect({ case BoundOutputExpression(ThisEntityTarget, attrName, attr) => (attrName, attr) })
val workspaceAttributes = workflowOutputs.collect({ case BoundOutputExpression(WorkspaceTarget, attrName, attr) => (attrName, attr) })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths
} else {
for {
// we exclude workflows submitted by users that have exceeded the max workflows per user
excludedUsers <- dataAccess.workflowQuery.listSubmittersWithMoreWorkflowsThan(maxActiveWorkflowsPerUser, WorkflowStatuses.runningStatuses)
workflowRecs <- dataAccess.workflowQuery.findQueuedWorkflows(excludedUsers.map { case (submitter, count) => submitter }, SubmissionStatuses.terminalStatuses :+ SubmissionStatuses.Aborting).take(batchSize).result
excludedUsersMap <- dataAccess.workflowQuery.listSubmittersWithMoreWorkflowsThan(maxActiveWorkflowsPerUser, WorkflowStatuses.runningStatuses).map(_.toMap)
excludedUsers = excludedUsersMap.keys.toSeq
excludedStatuses = SubmissionStatuses.terminalStatuses :+ SubmissionStatuses.Aborting
workflowRecs: Seq[WorkflowRecord] <- dataAccess.workflowQuery.findQueuedWorkflows(excludedUsers, excludedStatuses).take(batchSize).result
reservedRecs <- if (workflowRecs.isEmpty) {
DBIO.successful(None)
} else {
Expand Down Expand Up @@ -312,6 +314,8 @@ trait WorkflowSubmission extends FutureSupport with LazyLogging with MethodWiths

val WorkflowBatch(workflowIds, submissionRec, workspaceRec) = batch

logger.info(s"Submitting batch of ${workflowIds.size} workflows for submission ${submissionRec.id} in workspace ${workspaceRec.namespace}/${workspaceRec.name} (${workspaceRec.id})")

// implicitly passed to WorkflowComponent methods which update status
implicit val wfStatusCounter = (status: WorkflowStatus) =>
if (trackDetailedSubmissionMetrics) Option(workflowStatusCounter(workspaceSubmissionMetricBuilder(workspaceRec.toWorkspaceName, submissionRec.id))(status))
Expand Down

0 comments on commit 7b30964

Please sign in to comment.