Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
matthisk committed Dec 24, 2019
0 parents commit c1c0fd5
Show file tree
Hide file tree
Showing 16 changed files with 474 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
*.iml
out
22 changes: 22 additions & 0 deletions src/com/matthisk/flow/AbstractFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.matthisk.flow;

import com.matthisk.flow.operators.FlowOperator;
import java.util.concurrent.ExecutorService;

public abstract class AbstractFlow<T> implements Flow<T> {

@Override
public Flow<T> flowOn(ExecutorService executor) {
return new ChannelFlow<>(this, executor);
}

@Override
public <Y> Flow<Y> pipe(FlowOperator<T, Y> operator) {
return operator.apply(this);
}

@Override
public <Q, R> Flow<R> pipe(FlowOperator<T, Q> op1, FlowOperator<Q, R> op2) {
return FlowOperator.compose(op1, op2).apply(this);
}
}
47 changes: 47 additions & 0 deletions src/com/matthisk/flow/ChannelFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.matthisk.flow;

import com.matthisk.flow.channels.Channel;
import com.matthisk.flow.channels.Envelope;
import com.matthisk.flow.channels.RendezvousChannel;
import java.util.concurrent.ExecutorService;

public class ChannelFlow<T> extends AbstractFlow<T> {

private final ExecutorService executor;
private final Flow<T> inner;
private final Channel<T> channel;

ChannelFlow(Flow<T> inner, ExecutorService executor) {
this.inner = inner;
this.executor = executor;
this.channel = new RendezvousChannel<>();
}

@Override
public void collect(FlowCollector<T> collector) {
executor.submit(() -> {
try {
inner.collect(channel::send);
channel.complete();
} catch (Throwable ex) {
channel.complete(ex);
}
});

while (true) {
Envelope<T> envelope = channel.receive();

if (envelope.getValue() != null) {
collector.emit(envelope.getValue());
}

if (envelope.getCause() != null) {
throw new RuntimeException("Received failure from channel", envelope.getCause());
}

if (envelope.isComplete()) {
break;
}
}
}
}
63 changes: 63 additions & 0 deletions src/com/matthisk/flow/ChannelMergeFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.matthisk.flow;

import com.matthisk.flow.channels.Channel;
import com.matthisk.flow.channels.Envelope;
import com.matthisk.flow.channels.RendezvousChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class ChannelMergeFlow<T> extends AbstractFlow<T> {

private final int concurrency;
private final ExecutorService executor;
private final Flow<Flow<T>> inner;
private final Channel<T> channel;

public ChannelMergeFlow(Flow<Flow<T>> inner, int concurrency, ExecutorService executor) {
this.inner = inner;
this.concurrency = concurrency;
this.executor = executor;
this.channel = new RendezvousChannel<T>();
}

@Override
public void collect(FlowCollector<T> collector) {
Semaphore semaphore = new Semaphore(concurrency);

inner.collect(flow -> {
executor.submit(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to acquire lock", e);
}

try {
flow.collect(channel::send);
channel.complete();
} catch (Throwable ex) {
channel.complete(ex);
} finally {
semaphore.release();
}
});
});

while (true) {
Envelope<T> envelope = channel.receive();

if (envelope.getValue() != null) {
collector.emit(envelope.getValue());
}

if (envelope.getCause() != null) {
throw new RuntimeException("Received failure from channel", envelope.getCause());
}

if (envelope.isComplete()) {
break;
}
}
}
}
14 changes: 14 additions & 0 deletions src/com/matthisk/flow/Flow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.matthisk.flow;

import com.matthisk.flow.operators.FlowOperator;
import java.util.concurrent.ExecutorService;

public interface Flow<T> {
<U> Flow<U> pipe(FlowOperator<T, U> operator);

<U, V> Flow<V> pipe(FlowOperator<T, U> op1, FlowOperator<U, V> op2);

Flow<T> flowOn(ExecutorService executorService);

void collect(FlowCollector<T> collector);
}
5 changes: 5 additions & 0 deletions src/com/matthisk/flow/FlowCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.matthisk.flow;

public interface FlowCollector<T> {
void emit(T value);
}
5 changes: 5 additions & 0 deletions src/com/matthisk/flow/FlowProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.matthisk.flow;

public interface FlowProducer<T> {
void accept(FlowCollector<T> collector);
}
126 changes: 126 additions & 0 deletions src/com/matthisk/flow/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.matthisk.flow;

import static com.matthisk.flow.SyncFlow.flow;
import static com.matthisk.flow.operators.Operators.flatMap;
import static com.matthisk.flow.operators.Operators.flattenMerge;
import static com.matthisk.flow.operators.Operators.map;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {

public static void main(String[] args) {
ThreadFactory factory =
Thread.builder()
.name("worker")
.virtual()
.daemon(false)
.disallowThreadLocals()
.factory();

try (ExecutorService scheduler = Executors.newFixedThreadPool(1, factory)) {
Flow<Object> fl =
flow(collector -> {
collector.emit("Hello.");
collector.emit("World.");
})
.flowOn(scheduler);

scheduler.submit(
() -> {
fl.collect(
value ->
System.out.printf(
"[%s] %s\n",
Thread.currentThread().getName(), value));
});
}
}

public static void main3(String[] args) {
ThreadFactory factory =
Thread.builder()
.name("worker")
.virtual()
.daemon(false)
.disallowThreadLocals()
.factory();
ExecutorService executorService = Executors.newFixedThreadPool(1_000_000, factory);

AtomicInteger atomicInt = new AtomicInteger();

Flow<String> fl =
flow(
collector -> {
int index = atomicInt.incrementAndGet();
int rounds = 0;

while (rounds < 10) {
collector.emit(
String.format(
"[%d] time: %s",
index, System.currentTimeMillis()));
delay(200);
rounds++;
}
});

Flow<Integer> fl2 =
flow(
collector -> {
collector.emit(1);
collector.emit(2);
collector.emit(3);
});

fl2.pipe(map(value -> fl), flattenMerge(8, executorService))
.pipe(
map(
value -> {
System.out.printf(
"[%s] %s\n", Thread.currentThread().getName(), value);
return value.length();
}))
.flowOn(executorService)
.collect(
size ->
System.out.printf(
"[%s] %s\n", Thread.currentThread().getName(), size));
}

public static void main2(String[] args) throws InterruptedException {

Flow<String> flow =
flow(
consumer -> {
consumer.emit("joe");
delay(1000);
consumer.emit("programmer");
});

Flow<Integer> flow2 =
flow(
collector -> {
collector.emit(1);
delay(500);
collector.emit(2);
collector.emit(3);
});

flow.pipe(flatMap(value -> flow2)).collect(System.out::println);

flow.pipe(map(String::getBytes)).collect(System.out::println);
}

static void delay(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
19 changes: 19 additions & 0 deletions src/com/matthisk/flow/SyncFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.matthisk.flow;

public class SyncFlow<T> extends AbstractFlow<T> {

private final FlowProducer<T> producer;

public SyncFlow(FlowProducer<T> producer) {
this.producer = producer;
}

@Override
public void collect(FlowCollector<T> collector) {
producer.accept(collector);
}

public static <Y> Flow<Y> flow(FlowProducer<Y> consumer) {
return new SyncFlow<>(consumer);
};
}
4 changes: 4 additions & 0 deletions src/com/matthisk/flow/channels/Channel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.matthisk.flow.channels;

public interface Channel<T> extends ReceiveChannel<T>, SendChannel<T> {
}
51 changes: 51 additions & 0 deletions src/com/matthisk/flow/channels/Envelope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.matthisk.flow.channels;

import java.util.Objects;

public class Envelope<T> {

private final Throwable cause;

private final boolean isComplete;

private final T value;

private Envelope(T value, Throwable cause, boolean isComplete) {
this.cause = cause;
this.isComplete = isComplete;
this.value = value;
}

public Throwable getCause() {
return cause;
}

public T getValue() {
return value;
}
public boolean isComplete() {
return isComplete;
}
@Override
public int hashCode() {
return Objects.hash(cause, isComplete, value);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Envelope<?> envelope = (Envelope<?>) o;
return isComplete == envelope.isComplete &&
Objects.equals(cause, envelope.cause) &&
Objects.equals(value, envelope.value);
}

public static <T> Envelope<T> complete(Throwable ex) {
return new Envelope<>(null, ex, true);
}

public static <T> Envelope<T> value(T item) {
return new Envelope<>(item, null, false);
}
}
5 changes: 5 additions & 0 deletions src/com/matthisk/flow/channels/ReceiveChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.matthisk.flow.channels;

public interface ReceiveChannel<T> {
Envelope<T> receive();
}
Loading

0 comments on commit c1c0fd5

Please sign in to comment.