Skip to content

Commit

Permalink
Add Literal directive
Browse files Browse the repository at this point in the history
  • Loading branch information
cpubot committed Oct 24, 2023
1 parent 1cef451 commit a897b99
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion paladin-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "paladin-core"
version = "0.1.1"
version = "0.1.2"
description = "A Rust distributed algorithm toolkit. Write distributed algorithms without the complexities of distributed systems programming."
license.workspace = true
edition.workspace = true
Expand Down
43 changes: 43 additions & 0 deletions paladin-core/src/directive/literal/functor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use anyhow::Result;
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use tracing::error;

use super::Literal;
use crate::{directive::Functor, operation::Operation, runtime::Runtime, task::Task};

#[async_trait]
impl<A: Send, B: Send + 'static> Functor<B> for Literal<A> {
async fn f_map<Op: Operation<Input = A, Output = B>>(
self,
op: Op,
runtime: &Runtime,
) -> Result<Self::Target> {
let (channel_identifier, mut sender, mut receiver) =
runtime.lease_coordinated_task_channel::<Op, ()>().await?;

let task = Task {
routing_key: channel_identifier.clone(),
metadata: (),
op: op.clone(),
input: self.0,
};

sender.send(task).await?;
sender.close().await?;

if let Some((result, acker)) = receiver.next().await {
match acker.ack().await {
Ok(()) => return Ok(Literal(result.output)),
// We don't error out on a failed ack, but filter out the message, as we assume
// the message will be redelivered and we'll have another
// opportunity to ack.
Err(e) => {
error!("Failed to ack result: {}", e);
}
}
}

anyhow::bail!("No results received")
}
}
48 changes: 48 additions & 0 deletions paladin-core/src/directive/literal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/// A literal value.
///
/// Can be used to execute operations over arbitrary values.
///
/// ## Example
/// ```
/// # use paladin::{
/// # operation::Operation,
/// # directive::{Directive, Literal},
/// # opkind_derive::OpKind,
/// # runtime::Runtime,
/// # };
/// # use serde::{Deserialize, Serialize};
/// # use anyhow::Result;
/// #
/// # #[derive(Clone, Copy, Debug, Deserialize, Serialize)]
/// struct MultiplyBy(i32);
/// impl Operation for MultiplyBy {
/// type Input = i32;
/// type Output = i32;
/// type Kind = MyOps;
///
/// fn execute(&self, input: i32) -> Result<i32> {
/// Ok(self.0 * input)
/// }
/// }
/// #
/// # #[derive(OpKind, Copy, Clone, Debug, Deserialize, Serialize)]
/// # enum MyOps {
/// # MultiplyBy(MultiplyBy),
/// # }
///
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// # let runtime = Runtime::in_memory().await?;
/// let computation = Literal(5).map(MultiplyBy(2));
/// let result = computation.run(&runtime).await?;
/// assert_eq!(result, Literal(10));
/// # Ok(())
/// # }
/// ```
#[derive(Debug, Clone, PartialEq)]
pub struct Literal<T>(pub T);

impl_hkt!(Literal);
impl_lit!(Literal<T>);

mod functor;
2 changes: 2 additions & 0 deletions paladin-core/src/directive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,5 @@ impl<M: Monoid, F: Foldable<M::Elem, A = M::Elem> + Send, D: Directive<Output =

pub mod indexed_stream;
pub use indexed_stream::IndexedStream;
pub mod literal;
pub use literal::Literal;

0 comments on commit a897b99

Please sign in to comment.