diff --git a/Cargo.lock b/Cargo.lock index 0cadc95..610280c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -990,7 +990,7 @@ dependencies = [ [[package]] name = "paladin-core" -version = "0.1.0" +version = "0.1.2" dependencies = [ "anyhow", "async-trait", diff --git a/paladin-core/Cargo.toml b/paladin-core/Cargo.toml index ce71dae..b780a71 100644 --- a/paladin-core/Cargo.toml +++ b/paladin-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "paladin-core" -version = "0.1.1" +version = "0.1.2" description = "A Rust distributed algorithm toolkit. Write distributed algorithms without the complexities of distributed systems programming." license.workspace = true edition.workspace = true diff --git a/paladin-core/src/directive/literal/functor.rs b/paladin-core/src/directive/literal/functor.rs new file mode 100644 index 0000000..c02e8e7 --- /dev/null +++ b/paladin-core/src/directive/literal/functor.rs @@ -0,0 +1,43 @@ +use anyhow::Result; +use async_trait::async_trait; +use futures::{SinkExt, StreamExt}; +use tracing::error; + +use super::Literal; +use crate::{directive::Functor, operation::Operation, runtime::Runtime, task::Task}; + +#[async_trait] +impl Functor for Literal { + async fn f_map>( + self, + op: Op, + runtime: &Runtime, + ) -> Result { + let (channel_identifier, mut sender, mut receiver) = + runtime.lease_coordinated_task_channel::().await?; + + let task = Task { + routing_key: channel_identifier.clone(), + metadata: (), + op: op.clone(), + input: self.0, + }; + + sender.send(task).await?; + sender.close().await?; + + if let Some((result, acker)) = receiver.next().await { + match acker.ack().await { + Ok(()) => return Ok(Literal(result.output)), + // We don't error out on a failed ack, but filter out the message, as we assume + // the message will be redelivered and we'll have another + // opportunity to ack. + Err(e) => { + error!("Failed to ack result: {}", e); + } + } + } + + anyhow::bail!("No results received") + } +} diff --git a/paladin-core/src/directive/literal/mod.rs b/paladin-core/src/directive/literal/mod.rs new file mode 100644 index 0000000..3f73dbf --- /dev/null +++ b/paladin-core/src/directive/literal/mod.rs @@ -0,0 +1,48 @@ +/// A literal value. +/// +/// Can be used to execute operations over arbitrary values. +/// +/// ## Example +/// ``` +/// # use paladin::{ +/// # operation::Operation, +/// # directive::{Directive, Literal}, +/// # opkind_derive::OpKind, +/// # runtime::Runtime, +/// # }; +/// # use serde::{Deserialize, Serialize}; +/// # use anyhow::Result; +/// # +/// # #[derive(Clone, Copy, Debug, Deserialize, Serialize)] +/// struct MultiplyBy(i32); +/// impl Operation for MultiplyBy { +/// type Input = i32; +/// type Output = i32; +/// type Kind = MyOps; +/// +/// fn execute(&self, input: i32) -> Result { +/// Ok(self.0 * input) +/// } +/// } +/// # +/// # #[derive(OpKind, Copy, Clone, Debug, Deserialize, Serialize)] +/// # enum MyOps { +/// # MultiplyBy(MultiplyBy), +/// # } +/// +/// # #[tokio::main] +/// # async fn main() -> Result<()> { +/// # let runtime = Runtime::in_memory().await?; +/// let computation = Literal(5).map(MultiplyBy(2)); +/// let result = computation.run(&runtime).await?; +/// assert_eq!(result, Literal(10)); +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug, Clone, PartialEq)] +pub struct Literal(pub T); + +impl_hkt!(Literal); +impl_lit!(Literal); + +mod functor; diff --git a/paladin-core/src/directive/mod.rs b/paladin-core/src/directive/mod.rs index 6377349..4c7e04d 100644 --- a/paladin-core/src/directive/mod.rs +++ b/paladin-core/src/directive/mod.rs @@ -363,3 +363,5 @@ impl + Send, D: Directive