Skip to content

Commit

Permalink
JAMES-2586 Reactify AttachmentBlobResolver
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and Arsnael committed Jan 20, 2025
1 parent 7f5aad4 commit 0bc5508
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface AttachmentManager extends AttachmentContentLoader {

AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession mailboxSession) throws MailboxException, AttachmentNotFoundException;

Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId, MailboxSession mailboxSession);

List<AttachmentMetadata> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException;

InputStream loadAttachmentContent(AttachmentId attachmentId, MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ public AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSessio
return attachment;
}

@Override
public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId, MailboxSession mailboxSession) {
return attachmentMapperFactory.getAttachmentMapper(mailboxSession)
.getAttachmentReactive(attachmentId)
.filterWhen(attachment -> existsReactive(attachmentId, mailboxSession))
.switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId())));
}

@Override
public List<AttachmentMetadata> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException {
List<AttachmentMetadata> attachments = attachmentMapperFactory.getAttachmentMapper(mailboxSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ public ComposedMessageId appendMessage(String username, MailboxPath mailboxPath,
return messageManager.appendMessage(appendCommand, mailboxSession).getId();
}

public MessageManager.AppendResult appendMessageRetrieveAppendResult(String username, MailboxPath mailboxPath, MessageManager.AppendCommand appendCommand)
throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username));
MessageManager messageManager = mailboxManager.getMailbox(mailboxPath, mailboxSession);
return messageManager.appendMessage(appendCommand, mailboxSession);
}

public MessageManager.AppendResult appendMessageAndGetAppendResult(String username, MailboxPath mailboxPath, MessageManager.AppendCommand appendCommand)
throws MailboxException {
MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ case object CustomBlob extends Blob {
}

class CustomBlobResolver extends BlobResolver {
override def resolve(blobId: org.apache.james.jmap.mail.BlobId, mailboxSession: MailboxSession): BlobResolutionResult =
override def resolve(blobId: org.apache.james.jmap.mail.BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] =
if (blobId.equals(CustomBlob.blobId)) {
Applicable(SMono.just(CustomBlob))
SMono.just(Applicable(SMono.just(CustomBlob)))
} else {
NonApplicable
SMono.just(NonApplicable)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.james.jmap.rfc8621.contract.DownloadContract.accountId
import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ALICE_ACCOUNT_ID, ANDRE, BOB, BOB_PASSWORD, CEDRIC, DOMAIN, authScheme, baseRequestSpecBuilder}
import org.apache.james.mailbox.MessageManager.AppendCommand
import org.apache.james.mailbox.model.MailboxACL.Right
import org.apache.james.mailbox.model.{MailboxACL, MailboxPath, MessageId}
import org.apache.james.mailbox.model.{AttachmentId, MailboxACL, MailboxPath, MessageId}
import org.apache.james.mime4j.dom.Message
import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl}
import org.apache.james.util.ClassLoaderUtils
Expand Down Expand Up @@ -92,6 +92,32 @@ trait DownloadContract {
.hasContent(expectedResponse)
}

@Test
def downloadMailboxAttachment(server: GuiceJamesServer): Unit = {
val path = MailboxPath.inbox(BOB)
server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path)
val attachmentId: AttachmentId = server.getProbe(classOf[MailboxProbeImpl])
.appendMessageRetrieveAppendResult(BOB.asString, path, AppendCommand.from(
ClassLoaderUtils.getSystemResourceAsSharedStream("eml/multipart_simple.eml")))
.getMessageAttachments
.get(1).getAttachmentId

val response = `given`
.basePath("")
.header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER).log().all()
.when
.get(s"/download/$accountId/${attachmentId.getId()}")
.`then`
.statusCode(SC_OK)
.contentType("application/vnd.ms-publisher; name=\"text2\"")
.extract
.body
.asString

assertThat(response)
.isEqualTo("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDHs8bT4T/8QymbsiAjlD1MwNIXJr/WET6+9MmuTSIYWWU94csDn9WVMzRhaAbpfnSqIx8TdUtrN/ZzX2JetPSar/bU9nXAWeiC/jPFQ1qKH4GeDrYXRLKu4T8782OrGH8Jyror97TlNXhPrjdRLEB4bQqmmZhb3HwcD8a9XzfZqlm7GRWLo1WQMGt/NpQLC7jMf4fA6/+kjzsTspxwdgL74GJqPfOXOiwgLHX8CZ6/5RyTqhT6pD3MktSNWaz/zIHPNEqf5BY9CBM1TFR5w+6MDHo0gmiIsXFEJTPnfhBvHDhSjB1RI0KxUClyYrJ4fBlUVeKfnawoVcu7YvCqF4F5 quynhnn@linagora\n")
}

@Test
def downloadMessageShouldFailWhenUnauthentified(server: GuiceJamesServer): Unit = {
val path = MailboxPath.inbox(BOB)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.james.jmap.mail.{BlobId, MinimalEmailBodyPart}
import org.apache.james.jmap.method.{AccountNotFoundException, ZoneIdProvider}
import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
import org.apache.james.mailbox.exception.AttachmentNotFoundException
import org.apache.james.mailbox.model.ContentType.{MediaType, MimeType, SubType}
import org.apache.james.mailbox.model._
import org.apache.james.mailbox.{AttachmentIdFactory, AttachmentManager, MailboxSession, MessageIdManager}
Expand All @@ -57,10 +58,11 @@ import org.apache.james.mime4j.codec.EncoderUtil.Usage
import org.apache.james.mime4j.dom.SingleBody
import org.apache.james.mime4j.message.DefaultMessageWriter
import org.apache.james.util.ReactorUtils
import org.reactivestreams.Publisher
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json
import reactor.core.publisher.Mono
import reactor.core.scala.publisher.SMono
import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}

Expand All @@ -84,7 +86,7 @@ case class Applicable(blob: SMono[Blob]) extends BlobResolutionResult {
}

trait BlobResolver {
def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult
def resolve(blobId: BlobId, mailboxSession: MailboxSession): Publisher[BlobResolutionResult]
}

trait Blob {
Expand Down Expand Up @@ -143,49 +145,50 @@ case class EmailBodyPartBlob(blobId: BlobId, part: MinimalEmailBodyPart) extends

class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
val messageIdManager: MessageIdManager) extends BlobResolver {
override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = {
override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = {
Try(messageIdFactory.fromString(blobId.value.value)) match {
case Failure(_) => NonApplicable
case Success(messageId) => Applicable(SMono.fromPublisher(
case Failure(_) => SMono.just(NonApplicable)
case Success(messageId) => SMono.just(Applicable(SMono.fromPublisher(
messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession))
.map[Blob](MessageBlob(blobId, _))
.switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))
.switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))))
}
}
}

class UploadResolver @Inject()(val uploadService: UploadService) extends BlobResolver {
private val prefix = "uploads-"

override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = {
override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = {
if (!blobId.value.value.startsWith(prefix)) {
NonApplicable
SMono.just(NonApplicable)
} else {
val uploadIdAsString = blobId.value.value.substring(prefix.length)
Try(UploadId.from(uploadIdAsString)) match {
SMono.just(Try(UploadId.from(uploadIdAsString)) match {
case Failure(_) => NonApplicable
case Success(uploadId) => Applicable(
SMono(uploadService.retrieve(uploadId, mailboxSession.getUser))
.map(upload => UploadedBlob(blobId, upload))
.onErrorResume {
case _: UploadNotFoundException => SMono.error(BlobNotFoundException(blobId))
})
}
})
}
}
}

class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager, val attachmentIdFactory: AttachmentIdFactory) extends BlobResolver {
override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult =
override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] =
attachmentIdFactory.from(blobId.value.value) match {
case attachmentId: StringBackedAttachmentId =>
Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match {
case Success(attachmentMetadata) =>
Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession))
.map(content => AttachmentBlob(attachmentMetadata, content)))
case Failure(_) => NonApplicable
}
case _ => NonApplicable
SMono(attachmentManager.getAttachmentReactive(attachmentId, mailboxSession))
.map(attachmentMetadata => Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession))
.map(content => AttachmentBlob(attachmentMetadata, content))))
.onErrorResume {
case e: AttachmentNotFoundException => SMono.just(NonApplicable.asInstanceOf[BlobResolutionResult])
case e => SMono.error[BlobResolutionResult](e)
}
case _ => SMono.just(NonApplicable)
}
}

Expand All @@ -207,11 +210,11 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
case (acc, idPart) => acc.headOption.map(prefix => prefix + "_" + idPart).getOrElse(idPart) :: acc
}.flatMap(s => BlobId.of(s).toOption).take(parts.size).reverse

override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = {
override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = {
asMessageAndPartIds(blobId) match {
case Failure(_) => NonApplicable
case Failure(_) => SMono.just(NonApplicable)
case Success((messageId, blobIds)) =>
Applicable(SMono.fromPublisher(
SMono.just(Applicable(SMono.fromPublisher(
messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession))
.handle[MinimalEmailBodyPart] {
case (message, sink) => MinimalEmailBodyPart.ofMessage(None, zoneIdSupplier.get(), BlobId.of(messageId).get, message)
Expand All @@ -227,7 +230,7 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
.fold(sink.error(BlobNotFoundException(blobId)))(part => sink.next(part))
}
.map[Blob](EmailBodyPartBlob(blobId, _))
.switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))
.switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))))
}
}
}
Expand All @@ -240,9 +243,14 @@ class BlobResolvers(blobResolvers: Set[BlobResolver]) {
}

def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] =
blobResolvers.flatMap(resolver => resolver.resolve(blobId, mailboxSession).asOption)
.headOption
.getOrElse(SMono.error(BlobNotFoundException(blobId)))
SFlux.fromIterable(blobResolvers)
.concatMap(resolver => resolver.resolve(blobId, mailboxSession))
.filter {
case NonApplicable => false
case _: Applicable => true
}
.concatMap(result => result.asOption.getOrElse(SMono.error(BlobNotFoundException(blobId))))
.next().switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))
}

class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
Expand Down

0 comments on commit 0bc5508

Please sign in to comment.