!str #15076 Make foreach a terminal operation, returning Future[Unit]

This commit is contained in:
Patrik Nordwall 2014-08-15 15:37:09 +02:00
parent f4bc1704d3
commit fdc5532483
16 changed files with 278 additions and 232 deletions

View file

@ -23,6 +23,7 @@ import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.japi.Function;
import akka.japi.Function2;
import akka.japi.Pair;
@ -77,7 +78,7 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
probe.expectMsgEquals("de");
@ -87,16 +88,18 @@ public class FlowTest {
public void mustBeAbleToUseVoidTypeInForeach() {
final JavaTestKit probe = new JavaTestKit(system);
final java.util.Iterator<String> input = Arrays.asList("a", "b", "c").iterator();
Flow.create(input).foreach(new Procedure<String>() {
Future<Void> fut = Flow.create(input).foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).map(new Function<Void, String>() {
}, materializer);
fut.map(new Mapper<Void, String>() {
public String apply(Void elem) {
probe.getRef().tell(String.valueOf(elem), ActorRef.noSender());
return String.valueOf(elem);
}
}).consume(materializer);
}, system.dispatcher());
probe.expectMsgEquals("a");
probe.expectMsgEquals("b");
@ -139,7 +142,7 @@ public class FlowTest {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
probe.expectMsgEquals(0);
probe.expectMsgEquals(0);
@ -196,7 +199,7 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
probe.expectMsgEquals("0");
probe.expectMsgEquals("2");
@ -219,9 +222,9 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(new Pair<String, String>(pair.first(), elem), ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
}
}).consume(materializer);
}, materializer);
Map<String, List<String>> grouped = new HashMap<String, List<String>>();
for (Object o : probe.receiveN(5)) {
@ -256,9 +259,9 @@ public class FlowTest {
public void apply(List<String> chunk) {
probe.getRef().tell(chunk, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
}
}).consume(materializer);
}, materializer);
for (Object o : probe.receiveN(3)) {
@SuppressWarnings("unchecked")
@ -284,7 +287,7 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
Set<Object> output = new HashSet<Object>(Arrays.asList(probe.receiveN(6)));
assertEquals(new HashSet<Object>(Arrays.asList("A", "B", "C", "D", "E", "F")), output);
@ -300,7 +303,7 @@ public class FlowTest {
public void apply(Pair<String, Integer> elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
List<Object> output = Arrays.asList(probe.receiveN(3));
@SuppressWarnings("unchecked")
@ -318,7 +321,7 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
List<Object> output = Arrays.asList(probe.receiveN(6));
assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output);
@ -344,7 +347,7 @@ public class FlowTest {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
List<Object> output = Arrays.asList(probe.receiveN(5));
assertEquals(Arrays.asList(4, 3, 2, 1, 0), output);
@ -400,102 +403,85 @@ public class FlowTest {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6);
Future<Pair<List<Integer>, Publisher<Integer>>> future = Flow.create(input).prefixAndTail(3).toFuture(materializer);
Pair<List<Integer>, Publisher<Integer>> result =
Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
Pair<List<Integer>, Publisher<Integer>> result = Await.result(future,
probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(Arrays.asList(1, 2, 3), result.first());
Future<List<Integer>> tailFuture = Flow.create(result.second()).grouped(4).toFuture(materializer);
List<Integer> tailResult =
Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
List<Integer> tailResult = Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(Arrays.asList(4, 5, 6), tailResult);
}
@Test
public void mustBeAbleToUseConcatAll() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final java.lang.Iterable<Integer> input2 = Arrays.asList(4, 5);
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final java.lang.Iterable<Integer> input2 = Arrays.asList(4, 5);
final List<Publisher<Integer>> mainInputs = Arrays.asList(
Flow.create(input1).toPublisher(materializer),
Flow.create(input2).toPublisher(materializer)
);
final List<Publisher<Integer>> mainInputs = Arrays.asList(Flow.create(input1).toPublisher(materializer), Flow
.create(input2).toPublisher(materializer));
Future<List<Integer>> future =
Flow.create(mainInputs).<Integer>flatten(FlattenStrategy.<Integer>concat()).grouped(6).toFuture(materializer);
Future<List<Integer>> future = Flow.create(mainInputs).<Integer> flatten(FlattenStrategy.<Integer> concat())
.grouped(6).toFuture(materializer);
List<Integer> result =
Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
}
@Test
public void mustBeAbleToUseBuffer() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<List<String>> future = Flow
.create(input)
.buffer(2, OverflowStrategy.backpressure())
.grouped(4)
.toFuture(materializer);
Future<List<String>> future = Flow.create(input).buffer(2, OverflowStrategy.backpressure()).grouped(4)
.toFuture(materializer);
List<String> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(input, result);
}
@Test
public void mustBeAbleToUseConflate() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow
.create(input)
.conflate(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
},
new Function2<String, String, String>() {
@Override
public String apply(String in, String aggr) throws Exception {
return in;
}
}
)
.fold("", new Function2<String, String, String>() {
@Override
public String apply(String aggr, String in) throws Exception {
return in;
}
})
.toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("C", result);
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow.create(input).conflate(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
}, new Function2<String, String, String>() {
@Override
public String apply(String in, String aggr) throws Exception {
return in;
}
}).fold("", new Function2<String, String, String>() {
@Override
public String apply(String aggr, String in) throws Exception {
return in;
}
}).toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("C", result);
}
@Test
public void mustBeAbleToUseExpand() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow
.create(input)
.expand(new Function<String, String>() {
@Override
public String apply(String in) throws Exception {
return in;
}
},
new Function<String, Pair<String, String>>() {
@Override
public Pair<String, String> apply(String in) throws Exception {
return new Pair<String, String>(in, in);
}
}
)
.toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("A", result);
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow.create(input).expand(new Function<String, String>() {
@Override
public String apply(String in) throws Exception {
return in;
}
}, new Function<String, Pair<String, String>>() {
@Override
public Pair<String, String> apply(String in) throws Exception {
return new Pair<String, String>(in, in);
}
}).toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("A", result);
}
@Test
public void mustProduceTicks() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
@ -511,14 +497,14 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
probe.expectMsgEquals("tick-1");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
probe.expectMsgEquals("tick-2");
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}
@Test
public void mustBeAbleToUseMapFuture() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
@ -531,7 +517,7 @@ public class FlowTest {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
}, materializer);
probe.expectMsgEquals("A");
probe.expectMsgEquals("B");
probe.expectMsgEquals("C");