+str #15218 Add mapFuture operator
This commit is contained in:
parent
68555cde17
commit
e1bd39bf7d
11 changed files with 365 additions and 7 deletions
|
|
@ -9,22 +9,20 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import akka.stream.FlattenStrategy;
|
||||
import akka.stream.OverflowStrategy;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.reactivestreams.api.Producer;
|
||||
|
||||
import scala.Option;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.dispatch.Futures;
|
||||
import akka.japi.Function;
|
||||
import akka.japi.Function2;
|
||||
import akka.japi.Pair;
|
||||
|
|
@ -523,4 +521,23 @@ public class FlowTest {
|
|||
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseMapFuture() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final java.lang.Iterable<String> input = Arrays.asList("a", "b", "c");
|
||||
Flow.create(input).mapFuture(new Function<String, Future<String>>() {
|
||||
public Future<String> apply(String elem) {
|
||||
return Futures.successful(elem.toUpperCase());
|
||||
}
|
||||
}).foreach(new Procedure<String>() {
|
||||
public void apply(String elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}).consume(materializer);
|
||||
probe.expectMsgEquals("A");
|
||||
probe.expectMsgEquals("B");
|
||||
probe.expectMsgEquals("C");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue