diff --git a/src/com/matthisk/flow/AbstractFlow.java b/src/com/matthisk/flow/AbstractFlow.java index 920f30d..2382a45 100644 --- a/src/com/matthisk/flow/AbstractFlow.java +++ b/src/com/matthisk/flow/AbstractFlow.java @@ -1,6 +1,7 @@ package com.matthisk.flow; import com.matthisk.flow.operators.FlowOperator; +import com.matthisk.flow.operators.FlowTerminator; import java.util.concurrent.ExecutorService; public abstract class AbstractFlow implements Flow { @@ -19,4 +20,9 @@ public Flow pipe(FlowOperator operator) { public Flow pipe(FlowOperator op1, FlowOperator op2) { return FlowOperator.compose(op1, op2).apply(this); } + + @Override + public U terminate(FlowTerminator terminator) { + return terminator.apply(this); + } } diff --git a/src/com/matthisk/flow/ChannelFlow.java b/src/com/matthisk/flow/ChannelFlow.java index a689fbc..9c89aec 100644 --- a/src/com/matthisk/flow/ChannelFlow.java +++ b/src/com/matthisk/flow/ChannelFlow.java @@ -1,7 +1,8 @@ package com.matthisk.flow; +import static com.matthisk.flow.ChannelFlowUtils.channelToCollector; + import com.matthisk.flow.channels.Channel; -import com.matthisk.flow.channels.Envelope; import com.matthisk.flow.channels.RendezvousChannel; import java.util.concurrent.ExecutorService; @@ -28,20 +29,6 @@ public void collect(FlowCollector collector) { } }); - while (true) { - Envelope envelope = channel.receive(); - - if (envelope.getValue() != null) { - collector.emit(envelope.getValue()); - } - - if (envelope.getCause() != null) { - throw new RuntimeException("Received failure from channel", envelope.getCause()); - } - - if (envelope.isComplete()) { - break; - } - } + channelToCollector(channel, collector); } } diff --git a/src/com/matthisk/flow/ChannelFlowUtils.java b/src/com/matthisk/flow/ChannelFlowUtils.java new file mode 100644 index 0000000..2245951 --- /dev/null +++ b/src/com/matthisk/flow/ChannelFlowUtils.java @@ -0,0 +1,24 @@ +package com.matthisk.flow; + +import com.matthisk.flow.channels.Channel; +import com.matthisk.flow.channels.Envelope; + +public class ChannelFlowUtils { + static void channelToCollector(Channel channel, FlowCollector collector) { + while (true) { + Envelope envelope = channel.receive(); + + if (envelope.getValue() != null) { + collector.emit(envelope.getValue()); + } + + if (envelope.getCause() != null) { + throw new RuntimeException("Received failure from channel", envelope.getCause()); + } + + if (envelope.isComplete()) { + break; + } + } + } +} diff --git a/src/com/matthisk/flow/ChannelMergeFlow.java b/src/com/matthisk/flow/ChannelMergeFlow.java index 2fe823f..5ace0f6 100644 --- a/src/com/matthisk/flow/ChannelMergeFlow.java +++ b/src/com/matthisk/flow/ChannelMergeFlow.java @@ -1,7 +1,8 @@ package com.matthisk.flow; +import static com.matthisk.flow.ChannelFlowUtils.channelToCollector; + import com.matthisk.flow.channels.Channel; -import com.matthisk.flow.channels.Envelope; import com.matthisk.flow.channels.RendezvousChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; @@ -44,20 +45,6 @@ public void collect(FlowCollector collector) { }); }); - while (true) { - Envelope envelope = channel.receive(); - - if (envelope.getValue() != null) { - collector.emit(envelope.getValue()); - } - - if (envelope.getCause() != null) { - throw new RuntimeException("Received failure from channel", envelope.getCause()); - } - - if (envelope.isComplete()) { - break; - } - } + channelToCollector(channel, collector); } } diff --git a/src/com/matthisk/flow/Flow.java b/src/com/matthisk/flow/Flow.java index 95ed9fa..c99d5a6 100644 --- a/src/com/matthisk/flow/Flow.java +++ b/src/com/matthisk/flow/Flow.java @@ -1,6 +1,7 @@ package com.matthisk.flow; import com.matthisk.flow.operators.FlowOperator; +import com.matthisk.flow.operators.FlowTerminator; import java.util.concurrent.ExecutorService; public interface Flow { @@ -11,4 +12,6 @@ public interface Flow { Flow flowOn(ExecutorService executorService); void collect(FlowCollector collector); + + U terminate(FlowTerminator terminator); } diff --git a/src/com/matthisk/flow/Main.java b/src/com/matthisk/flow/Main.java index cc0cece..aafc319 100644 --- a/src/com/matthisk/flow/Main.java +++ b/src/com/matthisk/flow/Main.java @@ -4,7 +4,10 @@ import static com.matthisk.flow.operators.Operators.flatMap; import static com.matthisk.flow.operators.Operators.flattenMerge; import static com.matthisk.flow.operators.Operators.map; +import static com.matthisk.flow.operators.Terminators.toList; +import java.io.PrintStream; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -29,17 +32,17 @@ public static void main(String[] args) { }) .flowOn(scheduler); - scheduler.submit( - () -> { - fl.collect( - value -> - System.out.printf( - "[%s] %s\n", - Thread.currentThread().getName(), value)); - }); + List result = fl.terminate(toList()); + + System.out.println(result); } } + private static PrintStream print(Object value) { + return System.out.printf( + "[%s] %s\n", Thread.currentThread().getName(), value); + } + public static void main3(String[] args) { ThreadFactory factory = Thread.builder() @@ -80,15 +83,13 @@ public static void main3(String[] args) { .pipe( map( value -> { - System.out.printf( - "[%s] %s\n", Thread.currentThread().getName(), value); + print(value); return value.length(); })) .flowOn(executorService) .collect( size -> - System.out.printf( - "[%s] %s\n", Thread.currentThread().getName(), size)); + print(size)); } public static void main2(String[] args) throws InterruptedException { diff --git a/src/com/matthisk/flow/SyncFlow.java b/src/com/matthisk/flow/SyncFlow.java index 4dc528d..a9e337e 100644 --- a/src/com/matthisk/flow/SyncFlow.java +++ b/src/com/matthisk/flow/SyncFlow.java @@ -13,7 +13,7 @@ public void collect(FlowCollector collector) { producer.accept(collector); } - public static Flow flow(FlowProducer consumer) { - return new SyncFlow<>(consumer); + public static Flow flow(FlowProducer producer) { + return new SyncFlow<>(producer); }; } diff --git a/src/com/matthisk/flow/operators/FlowTerminator.java b/src/com/matthisk/flow/operators/FlowTerminator.java new file mode 100644 index 0000000..042deea --- /dev/null +++ b/src/com/matthisk/flow/operators/FlowTerminator.java @@ -0,0 +1,7 @@ +package com.matthisk.flow.operators; + +import com.matthisk.flow.Flow; + +public interface FlowTerminator { + U apply(Flow flow); +} diff --git a/src/com/matthisk/flow/operators/Operators.java b/src/com/matthisk/flow/operators/Operators.java index 4e192ed..896c8b6 100644 --- a/src/com/matthisk/flow/operators/Operators.java +++ b/src/com/matthisk/flow/operators/Operators.java @@ -9,7 +9,7 @@ public class Operators { public static FlowOperator map(Function fn) { - return inner -> SyncFlow.flow(collector -> inner.collect(value -> collector.emit(fn.apply(value)))); + return inner -> SyncFlow.flow(producer -> inner.collect(value -> producer.emit(fn.apply(value)))); } public static FlowOperator filter(Predicate predicate) { diff --git a/src/com/matthisk/flow/operators/Terminators.java b/src/com/matthisk/flow/operators/Terminators.java new file mode 100644 index 0000000..ee510eb --- /dev/null +++ b/src/com/matthisk/flow/operators/Terminators.java @@ -0,0 +1,14 @@ +package com.matthisk.flow.operators; + +import java.util.ArrayList; +import java.util.List; + +public class Terminators { + public static FlowTerminator> toList() { + return flow -> { + ArrayList result = new ArrayList<>(); + flow.collect(result::add); + return result; + }; + } +}