Skip to content

Commit

Permalink
Added terminal operators.
Browse files Browse the repository at this point in the history
Extracted channel -> flow into utility.
  • Loading branch information
matthisk committed Dec 27, 2019
1 parent c1c0fd5 commit b0db065
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 47 deletions.
6 changes: 6 additions & 0 deletions src/com/matthisk/flow/AbstractFlow.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.matthisk.flow;

import com.matthisk.flow.operators.FlowOperator;
import com.matthisk.flow.operators.FlowTerminator;
import java.util.concurrent.ExecutorService;

public abstract class AbstractFlow<T> implements Flow<T> {
Expand All @@ -19,4 +20,9 @@ public <Y> Flow<Y> pipe(FlowOperator<T, Y> operator) {
public <Q, R> Flow<R> pipe(FlowOperator<T, Q> op1, FlowOperator<Q, R> op2) {
return FlowOperator.compose(op1, op2).apply(this);
}

@Override
public <U> U terminate(FlowTerminator<T, U> terminator) {
return terminator.apply(this);
}
}
19 changes: 3 additions & 16 deletions src/com/matthisk/flow/ChannelFlow.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.matthisk.flow;

import static com.matthisk.flow.ChannelFlowUtils.channelToCollector;

import com.matthisk.flow.channels.Channel;
import com.matthisk.flow.channels.Envelope;
import com.matthisk.flow.channels.RendezvousChannel;
import java.util.concurrent.ExecutorService;

Expand All @@ -28,20 +29,6 @@ public void collect(FlowCollector<T> collector) {
}
});

while (true) {
Envelope<T> envelope = channel.receive();

if (envelope.getValue() != null) {
collector.emit(envelope.getValue());
}

if (envelope.getCause() != null) {
throw new RuntimeException("Received failure from channel", envelope.getCause());
}

if (envelope.isComplete()) {
break;
}
}
channelToCollector(channel, collector);
}
}
24 changes: 24 additions & 0 deletions src/com/matthisk/flow/ChannelFlowUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.matthisk.flow;

import com.matthisk.flow.channels.Channel;
import com.matthisk.flow.channels.Envelope;

public class ChannelFlowUtils {
static <T> void channelToCollector(Channel<T> channel, FlowCollector<T> collector) {
while (true) {
Envelope<T> envelope = channel.receive();

if (envelope.getValue() != null) {
collector.emit(envelope.getValue());
}

if (envelope.getCause() != null) {
throw new RuntimeException("Received failure from channel", envelope.getCause());
}

if (envelope.isComplete()) {
break;
}
}
}
}
19 changes: 3 additions & 16 deletions src/com/matthisk/flow/ChannelMergeFlow.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.matthisk.flow;

import static com.matthisk.flow.ChannelFlowUtils.channelToCollector;

import com.matthisk.flow.channels.Channel;
import com.matthisk.flow.channels.Envelope;
import com.matthisk.flow.channels.RendezvousChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -44,20 +45,6 @@ public void collect(FlowCollector<T> collector) {
});
});

while (true) {
Envelope<T> envelope = channel.receive();

if (envelope.getValue() != null) {
collector.emit(envelope.getValue());
}

if (envelope.getCause() != null) {
throw new RuntimeException("Received failure from channel", envelope.getCause());
}

if (envelope.isComplete()) {
break;
}
}
channelToCollector(channel, collector);
}
}
3 changes: 3 additions & 0 deletions src/com/matthisk/flow/Flow.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.matthisk.flow;

import com.matthisk.flow.operators.FlowOperator;
import com.matthisk.flow.operators.FlowTerminator;
import java.util.concurrent.ExecutorService;

public interface Flow<T> {
Expand All @@ -11,4 +12,6 @@ public interface Flow<T> {
Flow<T> flowOn(ExecutorService executorService);

void collect(FlowCollector<T> collector);

<U> U terminate(FlowTerminator<T, U> terminator);
}
25 changes: 13 additions & 12 deletions src/com/matthisk/flow/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import static com.matthisk.flow.operators.Operators.flatMap;
import static com.matthisk.flow.operators.Operators.flattenMerge;
import static com.matthisk.flow.operators.Operators.map;
import static com.matthisk.flow.operators.Terminators.toList;

import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -29,17 +32,17 @@ public static void main(String[] args) {
})
.flowOn(scheduler);

scheduler.submit(
() -> {
fl.collect(
value ->
System.out.printf(
"[%s] %s\n",
Thread.currentThread().getName(), value));
});
List<Object> result = fl.terminate(toList());

System.out.println(result);
}
}

private static PrintStream print(Object value) {
return System.out.printf(
"[%s] %s\n", Thread.currentThread().getName(), value);
}

public static void main3(String[] args) {
ThreadFactory factory =
Thread.builder()
Expand Down Expand Up @@ -80,15 +83,13 @@ public static void main3(String[] args) {
.pipe(
map(
value -> {
System.out.printf(
"[%s] %s\n", Thread.currentThread().getName(), value);
print(value);
return value.length();
}))
.flowOn(executorService)
.collect(
size ->
System.out.printf(
"[%s] %s\n", Thread.currentThread().getName(), size));
print(size));
}

public static void main2(String[] args) throws InterruptedException {
Expand Down
4 changes: 2 additions & 2 deletions src/com/matthisk/flow/SyncFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public void collect(FlowCollector<T> collector) {
producer.accept(collector);
}

public static <Y> Flow<Y> flow(FlowProducer<Y> consumer) {
return new SyncFlow<>(consumer);
public static <Y> Flow<Y> flow(FlowProducer<Y> producer) {
return new SyncFlow<>(producer);
};
}
7 changes: 7 additions & 0 deletions src/com/matthisk/flow/operators/FlowTerminator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.matthisk.flow.operators;

import com.matthisk.flow.Flow;

public interface FlowTerminator<T, U> {
U apply(Flow<T> flow);
}
2 changes: 1 addition & 1 deletion src/com/matthisk/flow/operators/Operators.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

public class Operators {
public static <A, B> FlowOperator<A, B> map(Function<A, B> fn) {
return inner -> SyncFlow.flow(collector -> inner.collect(value -> collector.emit(fn.apply(value))));
return inner -> SyncFlow.flow(producer -> inner.collect(value -> producer.emit(fn.apply(value))));
}

public static <A> FlowOperator<A, A> filter(Predicate<A> predicate) {
Expand Down
14 changes: 14 additions & 0 deletions src/com/matthisk/flow/operators/Terminators.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.matthisk.flow.operators;

import java.util.ArrayList;
import java.util.List;

public class Terminators {
public static <T> FlowTerminator<T, List<T>> toList() {
return flow -> {
ArrayList<T> result = new ArrayList<>();
flow.collect(result::add);
return result;
};
}
}

0 comments on commit b0db065

Please sign in to comment.