+str #15081: Implement conflate, expand and buffer

This commit is contained in:
Endre Sándor Varga 2014-05-20 16:02:09 +02:00
parent 6cd2f7d5d8
commit 50ab214d25
16 changed files with 1022 additions and 13 deletions

View file

@ -11,6 +11,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import akka.stream.FlattenStrategy;
import akka.stream.OverflowStrategy;
import org.junit.ClassRule;
import org.junit.Test;
@ -425,4 +426,70 @@ public class FlowTest {
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
}
@Test
public void mustBeAbleToUseBuffer() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<List<String>> future = Flow
.create(input)
.buffer(2, OverflowStrategy.backpressure())
.grouped(4)
.toFuture(materializer);
List<String> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(input, result);
}
@Test
public void mustBeAbleToUseConflate() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow
.create(input)
.conflate(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
},
new Function2<String, String, String>() {
@Override
public String apply(String in, String aggr) throws Exception {
return in;
}
}
)
.fold("", new Function2<String, String, String>() {
@Override
public String apply(String aggr, String in) throws Exception {
return in;
}
})
.toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("C", result);
}
@Test
public void mustBeAbleToUseExpand() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow
.create(input)
.expand(new Function<String, String>() {
@Override
public String apply(String in) throws Exception {
return in;
}
},
new Function<String, Pair<String, String>>() {
@Override
public Pair<String, String> apply(String in) throws Exception {
return new Pair<String, String>(in, in);
}
}
)
.toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("A", result);
}
}