diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f63a24e --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +*.iml +out diff --git a/src/com/matthisk/flow/AbstractFlow.java b/src/com/matthisk/flow/AbstractFlow.java new file mode 100644 index 0000000..920f30d --- /dev/null +++ b/src/com/matthisk/flow/AbstractFlow.java @@ -0,0 +1,22 @@ +package com.matthisk.flow; + +import com.matthisk.flow.operators.FlowOperator; +import java.util.concurrent.ExecutorService; + +public abstract class AbstractFlow implements Flow { + + @Override + public Flow flowOn(ExecutorService executor) { + return new ChannelFlow<>(this, executor); + } + + @Override + public Flow pipe(FlowOperator operator) { + return operator.apply(this); + } + + @Override + public Flow pipe(FlowOperator op1, FlowOperator op2) { + return FlowOperator.compose(op1, op2).apply(this); + } +} diff --git a/src/com/matthisk/flow/ChannelFlow.java b/src/com/matthisk/flow/ChannelFlow.java new file mode 100644 index 0000000..a689fbc --- /dev/null +++ b/src/com/matthisk/flow/ChannelFlow.java @@ -0,0 +1,47 @@ +package com.matthisk.flow; + +import com.matthisk.flow.channels.Channel; +import com.matthisk.flow.channels.Envelope; +import com.matthisk.flow.channels.RendezvousChannel; +import java.util.concurrent.ExecutorService; + +public class ChannelFlow extends AbstractFlow { + + private final ExecutorService executor; + private final Flow inner; + private final Channel channel; + + ChannelFlow(Flow inner, ExecutorService executor) { + this.inner = inner; + this.executor = executor; + this.channel = new RendezvousChannel<>(); + } + + @Override + public void collect(FlowCollector collector) { + executor.submit(() -> { + try { + inner.collect(channel::send); + channel.complete(); + } catch (Throwable ex) { + channel.complete(ex); + } + }); + + while (true) { + Envelope envelope = channel.receive(); + + if (envelope.getValue() != null) { + collector.emit(envelope.getValue()); + } + + if (envelope.getCause() != null) { + throw new RuntimeException("Received failure from channel", envelope.getCause()); + } + + if (envelope.isComplete()) { + break; + } + } + } +} diff --git a/src/com/matthisk/flow/ChannelMergeFlow.java b/src/com/matthisk/flow/ChannelMergeFlow.java new file mode 100644 index 0000000..2fe823f --- /dev/null +++ b/src/com/matthisk/flow/ChannelMergeFlow.java @@ -0,0 +1,63 @@ +package com.matthisk.flow; + +import com.matthisk.flow.channels.Channel; +import com.matthisk.flow.channels.Envelope; +import com.matthisk.flow.channels.RendezvousChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; + +public class ChannelMergeFlow extends AbstractFlow { + + private final int concurrency; + private final ExecutorService executor; + private final Flow> inner; + private final Channel channel; + + public ChannelMergeFlow(Flow> inner, int concurrency, ExecutorService executor) { + this.inner = inner; + this.concurrency = concurrency; + this.executor = executor; + this.channel = new RendezvousChannel(); + } + + @Override + public void collect(FlowCollector collector) { + Semaphore semaphore = new Semaphore(concurrency); + + inner.collect(flow -> { + executor.submit(() -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to acquire lock", e); + } + + try { + flow.collect(channel::send); + channel.complete(); + } catch (Throwable ex) { + channel.complete(ex); + } finally { + semaphore.release(); + } + }); + }); + + while (true) { + Envelope envelope = channel.receive(); + + if (envelope.getValue() != null) { + collector.emit(envelope.getValue()); + } + + if (envelope.getCause() != null) { + throw new RuntimeException("Received failure from channel", envelope.getCause()); + } + + if (envelope.isComplete()) { + break; + } + } + } +} diff --git a/src/com/matthisk/flow/Flow.java b/src/com/matthisk/flow/Flow.java new file mode 100644 index 0000000..95ed9fa --- /dev/null +++ b/src/com/matthisk/flow/Flow.java @@ -0,0 +1,14 @@ +package com.matthisk.flow; + +import com.matthisk.flow.operators.FlowOperator; +import java.util.concurrent.ExecutorService; + +public interface Flow { + Flow pipe(FlowOperator operator); + + Flow pipe(FlowOperator op1, FlowOperator op2); + + Flow flowOn(ExecutorService executorService); + + void collect(FlowCollector collector); +} diff --git a/src/com/matthisk/flow/FlowCollector.java b/src/com/matthisk/flow/FlowCollector.java new file mode 100644 index 0000000..9b5ef53 --- /dev/null +++ b/src/com/matthisk/flow/FlowCollector.java @@ -0,0 +1,5 @@ +package com.matthisk.flow; + +public interface FlowCollector { + void emit(T value); +} diff --git a/src/com/matthisk/flow/FlowProducer.java b/src/com/matthisk/flow/FlowProducer.java new file mode 100644 index 0000000..622b09a --- /dev/null +++ b/src/com/matthisk/flow/FlowProducer.java @@ -0,0 +1,5 @@ +package com.matthisk.flow; + +public interface FlowProducer { + void accept(FlowCollector collector); +} diff --git a/src/com/matthisk/flow/Main.java b/src/com/matthisk/flow/Main.java new file mode 100644 index 0000000..cc0cece --- /dev/null +++ b/src/com/matthisk/flow/Main.java @@ -0,0 +1,126 @@ +package com.matthisk.flow; + +import static com.matthisk.flow.SyncFlow.flow; +import static com.matthisk.flow.operators.Operators.flatMap; +import static com.matthisk.flow.operators.Operators.flattenMerge; +import static com.matthisk.flow.operators.Operators.map; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class Main { + + public static void main(String[] args) { + ThreadFactory factory = + Thread.builder() + .name("worker") + .virtual() + .daemon(false) + .disallowThreadLocals() + .factory(); + + try (ExecutorService scheduler = Executors.newFixedThreadPool(1, factory)) { + Flow fl = + flow(collector -> { + collector.emit("Hello."); + collector.emit("World."); + }) + .flowOn(scheduler); + + scheduler.submit( + () -> { + fl.collect( + value -> + System.out.printf( + "[%s] %s\n", + Thread.currentThread().getName(), value)); + }); + } + } + + public static void main3(String[] args) { + ThreadFactory factory = + Thread.builder() + .name("worker") + .virtual() + .daemon(false) + .disallowThreadLocals() + .factory(); + ExecutorService executorService = Executors.newFixedThreadPool(1_000_000, factory); + + AtomicInteger atomicInt = new AtomicInteger(); + + Flow fl = + flow( + collector -> { + int index = atomicInt.incrementAndGet(); + int rounds = 0; + + while (rounds < 10) { + collector.emit( + String.format( + "[%d] time: %s", + index, System.currentTimeMillis())); + delay(200); + rounds++; + } + }); + + Flow fl2 = + flow( + collector -> { + collector.emit(1); + collector.emit(2); + collector.emit(3); + }); + + fl2.pipe(map(value -> fl), flattenMerge(8, executorService)) + .pipe( + map( + value -> { + System.out.printf( + "[%s] %s\n", Thread.currentThread().getName(), value); + return value.length(); + })) + .flowOn(executorService) + .collect( + size -> + System.out.printf( + "[%s] %s\n", Thread.currentThread().getName(), size)); + } + + public static void main2(String[] args) throws InterruptedException { + + Flow flow = + flow( + consumer -> { + consumer.emit("joe"); + delay(1000); + consumer.emit("programmer"); + }); + + Flow flow2 = + flow( + collector -> { + collector.emit(1); + delay(500); + collector.emit(2); + collector.emit(3); + }); + + flow.pipe(flatMap(value -> flow2)).collect(System.out::println); + + flow.pipe(map(String::getBytes)).collect(System.out::println); + } + + static void delay(int ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } +} diff --git a/src/com/matthisk/flow/SyncFlow.java b/src/com/matthisk/flow/SyncFlow.java new file mode 100644 index 0000000..4dc528d --- /dev/null +++ b/src/com/matthisk/flow/SyncFlow.java @@ -0,0 +1,19 @@ +package com.matthisk.flow; + +public class SyncFlow extends AbstractFlow { + + private final FlowProducer producer; + + public SyncFlow(FlowProducer producer) { + this.producer = producer; + } + + @Override + public void collect(FlowCollector collector) { + producer.accept(collector); + } + + public static Flow flow(FlowProducer consumer) { + return new SyncFlow<>(consumer); + }; +} diff --git a/src/com/matthisk/flow/channels/Channel.java b/src/com/matthisk/flow/channels/Channel.java new file mode 100644 index 0000000..6335dee --- /dev/null +++ b/src/com/matthisk/flow/channels/Channel.java @@ -0,0 +1,4 @@ +package com.matthisk.flow.channels; + +public interface Channel extends ReceiveChannel, SendChannel { +} diff --git a/src/com/matthisk/flow/channels/Envelope.java b/src/com/matthisk/flow/channels/Envelope.java new file mode 100644 index 0000000..ad31752 --- /dev/null +++ b/src/com/matthisk/flow/channels/Envelope.java @@ -0,0 +1,51 @@ +package com.matthisk.flow.channels; + +import java.util.Objects; + +public class Envelope { + + private final Throwable cause; + + private final boolean isComplete; + + private final T value; + + private Envelope(T value, Throwable cause, boolean isComplete) { + this.cause = cause; + this.isComplete = isComplete; + this.value = value; + } + + public Throwable getCause() { + return cause; + } + + public T getValue() { + return value; + } + public boolean isComplete() { + return isComplete; + } + @Override + public int hashCode() { + return Objects.hash(cause, isComplete, value); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Envelope envelope = (Envelope) o; + return isComplete == envelope.isComplete && + Objects.equals(cause, envelope.cause) && + Objects.equals(value, envelope.value); + } + + public static Envelope complete(Throwable ex) { + return new Envelope<>(null, ex, true); + } + + public static Envelope value(T item) { + return new Envelope<>(item, null, false); + } +} diff --git a/src/com/matthisk/flow/channels/ReceiveChannel.java b/src/com/matthisk/flow/channels/ReceiveChannel.java new file mode 100644 index 0000000..91cf152 --- /dev/null +++ b/src/com/matthisk/flow/channels/ReceiveChannel.java @@ -0,0 +1,5 @@ +package com.matthisk.flow.channels; + +public interface ReceiveChannel { + Envelope receive(); +} diff --git a/src/com/matthisk/flow/channels/RendezvousChannel.java b/src/com/matthisk/flow/channels/RendezvousChannel.java new file mode 100644 index 0000000..ffd925a --- /dev/null +++ b/src/com/matthisk/flow/channels/RendezvousChannel.java @@ -0,0 +1,53 @@ +package com.matthisk.flow.channels; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +public class RendezvousChannel implements Channel { + + private final BlockingQueue> queue; + + public RendezvousChannel() { + this.queue = new ArrayBlockingQueue<>(1); + } + + @Override + public Envelope receive() { + try { + return queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while receiving from channel", e); + } + } + + @Override + public void send(T item) { + try { + queue.put(Envelope.value(item)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while sending on channel", e); + } + } + + @Override + public void complete(Throwable ex) { + try { + queue.put(Envelope.complete(ex)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while closing channel", e); + } + } + + @Override + public void complete() { + try { + queue.put(Envelope.complete(null)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while closing channel", e); + } + } +} diff --git a/src/com/matthisk/flow/channels/SendChannel.java b/src/com/matthisk/flow/channels/SendChannel.java new file mode 100644 index 0000000..6c6be7b --- /dev/null +++ b/src/com/matthisk/flow/channels/SendChannel.java @@ -0,0 +1,9 @@ +package com.matthisk.flow.channels; + +public interface SendChannel { + void send(T item); + + void complete(Throwable ex); + + void complete(); +} diff --git a/src/com/matthisk/flow/operators/FlowOperator.java b/src/com/matthisk/flow/operators/FlowOperator.java new file mode 100644 index 0000000..b4a21b3 --- /dev/null +++ b/src/com/matthisk/flow/operators/FlowOperator.java @@ -0,0 +1,11 @@ +package com.matthisk.flow.operators; + +import com.matthisk.flow.Flow; + +public interface FlowOperator { + Flow apply(Flow inner); + + static FlowOperator compose(FlowOperator fn, FlowOperator gn) { + return inner -> gn.apply(fn.apply(inner)); + } +} diff --git a/src/com/matthisk/flow/operators/Operators.java b/src/com/matthisk/flow/operators/Operators.java new file mode 100644 index 0000000..4e192ed --- /dev/null +++ b/src/com/matthisk/flow/operators/Operators.java @@ -0,0 +1,37 @@ +package com.matthisk.flow.operators; + +import com.matthisk.flow.ChannelMergeFlow; +import com.matthisk.flow.Flow; +import com.matthisk.flow.SyncFlow; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.Predicate; + +public class Operators { + public static FlowOperator map(Function fn) { + return inner -> SyncFlow.flow(collector -> inner.collect(value -> collector.emit(fn.apply(value)))); + } + + public static FlowOperator filter(Predicate predicate) { + return inner -> + SyncFlow.flow( + collector -> + inner.collect( + value -> { + if (predicate.test(value)) collector.emit(value); + })); + } + + public static FlowOperator flatMap(Function> fn) { + return FlowOperator.compose(map(fn), flattenConcat()); + } + + public static FlowOperator, A> flattenConcat() { + return inner -> SyncFlow.flow(collector -> inner.collect(value -> value.collect(collector))); + } + + public static FlowOperator, A> flattenMerge( + int concurrency, ExecutorService executorService) { + return inner -> new ChannelMergeFlow(inner, concurrency, executorService); + } +}