diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index cf5b834624..dcc859f462 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -32,12 +32,12 @@ class HttpServerExampleSpec val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080) bindingFuture foreach { case Http.ServerBinding(localAddress, connectionStream) ⇒ - Flow(connectionStream).foreach { + Flow(connectionStream).foreach({ case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ println("Accepted new connection from " + remoteAddress) // handle connection here - }.consume(materializer) + }, materializer) } //#bind-example } @@ -74,12 +74,12 @@ class HttpServerExampleSpec // ... bindingFuture foreach { case Http.ServerBinding(localAddress, connectionStream) ⇒ - Flow(connectionStream).foreach { + Flow(connectionStream).foreach({ case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) ⇒ println("Accepted new connection from " + remoteAddress) Flow(requestProducer).map(requestHandler).produceTo(responseConsumer, materializer) - }.consume(materializer) + }, materializer) } //#full-server-example } diff --git a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java index 912f544c14..408172bbfd 100644 --- a/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java +++ b/akka-http-core/src/test/java/akka/http/model/japi/JavaTestServer.java @@ -21,42 +21,40 @@ import java.io.IOException; import java.io.InputStreamReader; public abstract class JavaTestServer { - public static void main(String[] args) throws IOException, InterruptedException { - ActorSystem system = ActorSystem.create(); + public static void main(String[] args) throws IOException, InterruptedException { + ActorSystem system = ActorSystem.create(); - MaterializerSettings settings = MaterializerSettings.create(); - final FlowMaterializer materializer = FlowMaterializer.create(settings, system); + MaterializerSettings settings = MaterializerSettings.create(); + final FlowMaterializer materializer = FlowMaterializer.create(settings, system); - ActorRef httpManager = Http.get(system).manager(); - Future binding = ask(httpManager, Http.bind("localhost", 8080), 1000); - binding.foreach(new Foreach() { - @Override - public void each(Object result) throws Throwable { - ServerBinding binding = (ServerBinding) result; - System.out.println("Bound to " + binding.localAddress()); + ActorRef httpManager = Http.get(system).manager(); + Future binding = ask(httpManager, Http.bind("localhost", 8080), 1000); + binding.foreach(new Foreach() { + @Override + public void each(Object result) throws Throwable { + ServerBinding binding = (ServerBinding) result; + System.out.println("Bound to " + binding.localAddress()); - Flow.create(binding.getConnectionStream()).foreach(new Procedure() { - @Override - public void apply(IncomingConnection conn) throws Exception { - System.out.println("New incoming connection from " + conn.remoteAddress()); + Flow.create(binding.getConnectionStream()).foreach(new Procedure() { + @Override + public void apply(IncomingConnection conn) throws Exception { + System.out.println("New incoming connection from " + conn.remoteAddress()); - Flow.create(conn.getRequestPublisher()) - .map(new Function() { - @Override - public HttpResponse apply(HttpRequest request) throws Exception { - System.out.println("Handling request to " + request.getUri()); - return JavaApiTestCases.handleRequest(request); - } - }) - .produceTo(conn.getResponseSubscriber(), materializer); - } - }).consume(materializer); - } - }, system.dispatcher()); + Flow.create(conn.getRequestPublisher()).map(new Function() { + @Override + public HttpResponse apply(HttpRequest request) throws Exception { + System.out.println("Handling request to " + request.getUri()); + return JavaApiTestCases.handleRequest(request); + } + }).produceTo(conn.getResponseSubscriber(), materializer); + } + }, materializer); + } + }, system.dispatcher()); - System.out.println("Press ENTER to stop."); - new BufferedReader(new InputStreamReader(System.in)).readLine(); + System.out.println("Press ENTER to stop."); + new BufferedReader(new InputStreamReader(System.in)).readLine(); - system.shutdown(); - } + system.shutdown(); + } } diff --git a/akka-http-core/src/test/scala/akka/http/TestServer.scala b/akka-http-core/src/test/scala/akka/http/TestServer.scala index c050ddbb4c..9e16fc292f 100644 --- a/akka-http-core/src/test/scala/akka/http/TestServer.scala +++ b/akka-http-core/src/test/scala/akka/http/TestServer.scala @@ -36,11 +36,11 @@ object TestServer extends App { val bindingFuture = IO(Http) ? Http.Bind(interface = "localhost", port = 8080) bindingFuture foreach { case Http.ServerBinding(localAddress, connectionStream) ⇒ - Flow(connectionStream).foreach { + Flow(connectionStream).foreach({ case Http.IncomingConnection(remoteAddress, requestPublisher, responseSubscriber) ⇒ println("Accepted new connection from " + remoteAddress) Flow(requestPublisher).map(requestHandler).produceTo(responseSubscriber, materializer) - }.consume(materializer) + }, materializer) } println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 8fa1f54dbc..e68613430b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -22,7 +22,6 @@ import akka.util.Collections.EmptyImmutableSeq * INTERNAL API */ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]) extends Flow[O] with Builder[O] { - import FlowImpl._ import Ast._ type Thing[T] = Flow[T] @@ -68,6 +67,9 @@ private[akka] case class FlowImpl[I, O](publisherNode: Ast.PublisherNode[I], ops override def produceTo(subscriber: Subscriber[_ >: O], materializer: FlowMaterializer): Unit = toPublisher(materializer).subscribe(subscriber.asInstanceOf[Subscriber[O]]) + + override def foreach(c: O ⇒ Unit, materializer: FlowMaterializer): Future[Unit] = + foreachTransform(c).toFuture(materializer) } /** @@ -108,6 +110,15 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ override def build(materializer: FlowMaterializer): (Subscriber[In], Publisher[Out]) = materializer.ductBuild(ops) + override def foreach(c: Out ⇒ Unit, materializer: FlowMaterializer): (Subscriber[In], Future[Unit]) = { + val p = Promise[Unit]() + val s = foreachTransform(c).onComplete({ + case Success(_) ⇒ p.success(()) + case Failure(e) ⇒ p.failure(e) + }, materializer) + (s, p.future) + } + } /** @@ -116,7 +127,6 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ private[akka] object Builder { val SuccessUnit = Success[Unit](()) private val ListOfUnit = List(()) - private case object TakeWithinTimerKey private case object DropWithinTimerKey private case object GroupedWithinTimerKey @@ -164,7 +174,7 @@ private[akka] trait Builder[Out] { override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil }) - def foreach(c: Out ⇒ Unit): Thing[Unit] = + def foreachTransform(c: Out ⇒ Unit): Thing[Unit] = transform(new Transformer[Out, Unit] { override def onNext(in: Out) = { c(in); Nil } override def onTermination(e: Option[Throwable]) = ListOfUnit diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index ca7590b4f4..191374f5e2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -19,6 +19,7 @@ import akka.stream.scaladsl.{ Duct ⇒ SDuct } import akka.stream.impl.Ast import scala.concurrent.duration.FiniteDuration import scala.concurrent.Future +import akka.dispatch.ExecutionContexts /** * Java API @@ -74,15 +75,6 @@ abstract class Duct[In, Out] { */ def collect[U](pf: PartialFunction[Out, U]): Duct[In, U] - /** - * Invoke the given procedure for each received element and produce a Unit value - * upon reaching the normal end of the stream. Please note that also in this case - * the `Duct` needs to be materialized (e.g. using [[#consume]] and attaching the - * the `Subscriber` representing the input side of the `Duct` to an upstream - * `Publisher`) to initiate its execution. - */ - def foreach(c: Procedure[Out]): Duct[In, Void] - /** * Invoke the given function for every received element, giving it its previous * output (or the given “zero” value) and the element as input. The returned stream @@ -325,6 +317,24 @@ abstract class Duct[In, Out] { */ def build(materializer: FlowMaterializer): Pair[Subscriber[In], Publisher[Out]] + /** + * Invoke the given procedure for each received element. + * Returns a pair of a `Subscriber` and a `Future`. + * + * The returned `Subscriber` represents the input side of the `Duct` and can + * later be connected to an upstream `Publisher`. + * + * The returned [[scala.concurrent.Future]] will be completed with `Success` when + * reaching the normal end of the stream, or completed + * with `Failure` if there is an error is signaled in the stream. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def foreach(c: Procedure[Out], materializer: FlowMaterializer): Pair[Subscriber[In], Future[Void]] + /** * INTERNAL API * Used by `Flow.append(duct)`. @@ -345,9 +355,6 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def collect[U](pf: PartialFunction[T, U]): Duct[In, U] = new DuctAdapter(delegate.collect(pf)) - override def foreach(c: Procedure[T]): Duct[In, Void] = - new DuctAdapter(delegate.foreach(c.apply).map(_ ⇒ null)) // FIXME optimize to one step - override def fold[U](zero: U, f: Function2[U, T, U]): Duct[In, U] = new DuctAdapter(delegate.fold(zero) { case (a, b) ⇒ f.apply(a, b) }) @@ -432,6 +439,13 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, Pair(in, out) } + override def foreach(c: Procedure[T], materializer: FlowMaterializer): Pair[Subscriber[In], Future[Void]] = { + val (in, fut) = delegate.foreach(elem ⇒ c.apply(elem), materializer) + implicit val ec = ExecutionContexts.sameThreadExecutionContext + val voidFut = fut.map(_ ⇒ null).mapTo[Void] + Pair(in, voidFut) + } + override private[akka] def ops: immutable.Seq[Ast.AstNode] = delegate.ops } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 62a99231ab..ba733d0e6f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -19,6 +19,7 @@ import akka.japi.Util.immutableSeq import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Flow ⇒ SFlow } import scala.concurrent.duration.FiniteDuration +import akka.dispatch.ExecutionContexts /** * Java API @@ -136,14 +137,6 @@ abstract class Flow[T] { */ def collect[U](pf: PartialFunction[T, U]): Flow[U] - /** - * Invoke the given procedure for each received element and produce a Unit value - * upon reaching the normal end of the stream. Please note that also in this case - * the flow needs to be materialized (e.g. using [[#consume]]) to initiate its - * execution. - */ - def foreach(c: Procedure[T]): Flow[Void] - /** * Invoke the given function for every received element, giving it its previous * output (or the given “zero” value) and the element as input. The returned stream @@ -390,6 +383,18 @@ abstract class Flow[T] { */ def produceTo(subscriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit + /** + * Invoke the given procedure for each received element. Returns a [[scala.concurrent.Future]] + * that will be completed with `Success` when reaching the normal end of the stream, or completed + * with `Failure` if there is an error is signaled in the stream. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def foreach(c: Procedure[T], materializer: FlowMaterializer): Future[Void] + } /** @@ -416,9 +421,6 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def collect[U](pf: PartialFunction[T, U]): Flow[U] = new FlowAdapter(delegate.collect(pf)) - override def foreach(c: Procedure[T]): Flow[Void] = - new FlowAdapter(delegate.foreach(c.apply).map(_ ⇒ null)) // FIXME optimize to one step - override def fold[U](zero: U, f: Function2[U, T, U]): Flow[U] = new FlowAdapter(delegate.fold(zero) { case (a, b) ⇒ f.apply(a, b) }) @@ -499,4 +501,9 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def produceTo(subsriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit = delegate.produceTo(subsriber, materializer) + override def foreach(c: Procedure[T], materializer: FlowMaterializer): Future[Void] = { + implicit val ec = ExecutionContexts.sameThreadExecutionContext + delegate.foreach(elem ⇒ c.apply(elem), materializer).map(_ ⇒ null).mapTo[Void] + } + } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 04a493155a..7eb4e8f1b1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -63,15 +63,6 @@ trait Duct[In, +Out] { */ def collect[U](pf: PartialFunction[Out, U]): Duct[In, U] - /** - * Invoke the given procedure for each received element and produce a Unit value - * upon reaching the normal end of the stream. Please note that also in this case - * the `Duct` needs to be materialized (e.g. using [[#consume]] and attaching the - * the `Subscriber` representing the input side of the `Duct` to an upstream - * `Publisher`) to initiate its execution. - */ - def foreach(c: Out ⇒ Unit): Duct[In, Unit] - /** * Invoke the given function for every received element, giving it its previous * output (or the given “zero” value) and the element as input. The returned stream @@ -319,6 +310,24 @@ trait Duct[In, +Out] { */ def build(materializer: FlowMaterializer): (Subscriber[In], Publisher[Out] @uncheckedVariance) + /** + * Invoke the given procedure for each received element. + * Returns a tuple of a `Subscriber` and a `Future`. + * + * The returned `Subscriber` represents the input side of the `Duct` and can + * later be connected to an upstream `Publisher`. + * + * The returned [[scala.concurrent.Future]] will be completed with `Success` when + * reaching the normal end of the stream, or completed + * with `Failure` if there is an error is signaled in the stream. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def foreach(c: Out ⇒ Unit, materializer: FlowMaterializer): (Subscriber[In], Future[Unit]) + /** * INTERNAL API * Used by `Flow.append(duct)`. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index eb880fa132..4c98fb0248 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -130,14 +130,6 @@ trait Flow[+T] { */ def collect[U](pf: PartialFunction[T, U]): Flow[U] - /** - * Invoke the given procedure for each received element and produce a Unit value - * upon reaching the normal end of the stream. Please note that also in this case - * the `Flow` needs to be materialized (e.g. using [[#consume]]) to initiate its - * execution. - */ - def foreach(c: T ⇒ Unit): Flow[Unit] - /** * Invoke the given function for every received element, giving it its previous * output (or the given “zero” value) and the element as input. The returned stream @@ -392,5 +384,17 @@ trait Flow[+T] { */ def produceTo(subscriber: Subscriber[_ >: T], materializer: FlowMaterializer): Unit + /** + * Invoke the given procedure for each received element. Returns a [[scala.concurrent.Future]] + * that will be completed with `Success` when reaching the normal end of the stream, or completed + * with `Failure` if there is an error is signaled in the stream. + * + * *This will materialize the flow and initiate its execution.* + * + * The given FlowMaterializer decides how the flow’s logical structure is + * broken down into individual processing steps. + */ + def foreach(c: T ⇒ Unit, materializer: FlowMaterializer): Future[Unit] + } diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index 6896e9d22d..7536c75016 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -37,28 +37,30 @@ public class DuctTest { final JavaTestKit probe = new JavaTestKit(system); final String[] lookup = { "a", "b", "c", "d", "e", "f" }; - Subscriber inputSubscriber = Duct.create(Integer.class).drop(2).take(3).map(new Function() { - public String apply(Integer elem) { - return lookup[elem]; - } - }).filter(new Predicate() { - public boolean test(String elem) { - return !elem.equals("c"); - } - }).grouped(2).mapConcat(new Function, java.util.List>() { - public java.util.List apply(java.util.List elem) { - return elem; - } - }).fold("", new Function2() { - public String apply(String acc, String elem) { - return acc + elem; - } - }).foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } - }).consume(materializer); + Pair, Future> foreachPair = Duct.create(Integer.class).drop(2).take(3) + .map(new Function() { + public String apply(Integer elem) { + return lookup[elem]; + } + }).filter(new Predicate() { + public boolean test(String elem) { + return !elem.equals("c"); + } + }).grouped(2).mapConcat(new Function, java.util.List>() { + public java.util.List apply(java.util.List elem) { + return elem; + } + }).fold("", new Function2() { + public String apply(String acc, String elem) { + return acc + elem; + } + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + Subscriber inputSubscriber = foreachPair.first(); final java.util.Iterator input = Arrays.asList(0, 1, 2, 3, 4, 5).iterator(); Publisher publisher = Flow.create(input).toPublisher(materializer); @@ -76,7 +78,7 @@ public class DuctTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); Publisher publisher = Flow.create(Arrays.asList("a", "b", "c")).toPublisher(materializer); @@ -94,7 +96,7 @@ public class DuctTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer).first(); Subscriber inSubscriber = Duct.create(String.class).produceTo(subscriber, materializer); @@ -111,14 +113,10 @@ public class DuctTest { public void mustBeAppendableToFlow() { final JavaTestKit probe = new JavaTestKit(system); - Duct duct = Duct.create(String.class).map(new Function() { + Duct duct = Duct.create(String.class).map(new Function() { public String apply(String elem) { return elem.toLowerCase(); } - }).foreach(new Procedure() { - public void apply(String elem) { - probe.getRef().tell(elem, ActorRef.noSender()); - } }); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); @@ -129,7 +127,11 @@ public class DuctTest { } }); - flow.append(duct).consume(materializer); + flow.append(duct).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); probe.expectMsgEquals("a"); probe.expectMsgEquals("b"); @@ -158,7 +160,7 @@ public class DuctTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer).first(); Flow.create(Arrays.asList(1, 2, 3)).produceTo(ductInSubscriber, materializer); @@ -201,7 +203,7 @@ public class DuctTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer).first(); final java.lang.Iterable input = Arrays.asList("a", "b", "c"); Flow.create(input).produceTo(c, materializer); diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index fee3e6c279..b67b54428c 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -23,6 +23,7 @@ import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.dispatch.Futures; +import akka.dispatch.Mapper; import akka.japi.Function; import akka.japi.Function2; import akka.japi.Pair; @@ -77,7 +78,7 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); probe.expectMsgEquals("de"); @@ -87,16 +88,18 @@ public class FlowTest { public void mustBeAbleToUseVoidTypeInForeach() { final JavaTestKit probe = new JavaTestKit(system); final java.util.Iterator input = Arrays.asList("a", "b", "c").iterator(); - Flow.create(input).foreach(new Procedure() { + Future fut = Flow.create(input).foreach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).map(new Function() { + }, materializer); + + fut.map(new Mapper() { public String apply(Void elem) { probe.getRef().tell(String.valueOf(elem), ActorRef.noSender()); return String.valueOf(elem); } - }).consume(materializer); + }, system.dispatcher()); probe.expectMsgEquals("a"); probe.expectMsgEquals("b"); @@ -139,7 +142,7 @@ public class FlowTest { public void apply(Integer elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); probe.expectMsgEquals(0); probe.expectMsgEquals(0); @@ -196,7 +199,7 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); probe.expectMsgEquals("0"); probe.expectMsgEquals("2"); @@ -219,9 +222,9 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(new Pair(pair.first(), elem), ActorRef.noSender()); } - }).consume(materializer); + }, materializer); } - }).consume(materializer); + }, materializer); Map> grouped = new HashMap>(); for (Object o : probe.receiveN(5)) { @@ -256,9 +259,9 @@ public class FlowTest { public void apply(List chunk) { probe.getRef().tell(chunk, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); } - }).consume(materializer); + }, materializer); for (Object o : probe.receiveN(3)) { @SuppressWarnings("unchecked") @@ -284,7 +287,7 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); Set output = new HashSet(Arrays.asList(probe.receiveN(6))); assertEquals(new HashSet(Arrays.asList("A", "B", "C", "D", "E", "F")), output); @@ -300,7 +303,7 @@ public class FlowTest { public void apply(Pair elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); List output = Arrays.asList(probe.receiveN(3)); @SuppressWarnings("unchecked") @@ -318,7 +321,7 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); List output = Arrays.asList(probe.receiveN(6)); assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); @@ -344,7 +347,7 @@ public class FlowTest { public void apply(Integer elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); List output = Arrays.asList(probe.receiveN(5)); assertEquals(Arrays.asList(4, 3, 2, 1, 0), output); @@ -400,102 +403,85 @@ public class FlowTest { final JavaTestKit probe = new JavaTestKit(system); final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); Future, Publisher>> future = Flow.create(input).prefixAndTail(3).toFuture(materializer); - Pair, Publisher> result = - Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + Pair, Publisher> result = Await.result(future, + probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(Arrays.asList(1, 2, 3), result.first()); Future> tailFuture = Flow.create(result.second()).grouped(4).toFuture(materializer); - List tailResult = - Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + List tailResult = Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(Arrays.asList(4, 5, 6), tailResult); } @Test public void mustBeAbleToUseConcatAll() throws Exception { - final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input1 = Arrays.asList(1, 2, 3); - final java.lang.Iterable input2 = Arrays.asList(4, 5); + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input1 = Arrays.asList(1, 2, 3); + final java.lang.Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = Arrays.asList( - Flow.create(input1).toPublisher(materializer), - Flow.create(input2).toPublisher(materializer) - ); + final List> mainInputs = Arrays.asList(Flow.create(input1).toPublisher(materializer), Flow + .create(input2).toPublisher(materializer)); - Future> future = - Flow.create(mainInputs).flatten(FlattenStrategy.concat()).grouped(6).toFuture(materializer); + Future> future = Flow.create(mainInputs). flatten(FlattenStrategy. concat()) + .grouped(6).toFuture(materializer); - List result = - Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); - assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); + assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); } @Test public void mustBeAbleToUseBuffer() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); - Future> future = Flow - .create(input) - .buffer(2, OverflowStrategy.backpressure()) - .grouped(4) - .toFuture(materializer); + Future> future = Flow.create(input).buffer(2, OverflowStrategy.backpressure()).grouped(4) + .toFuture(materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals(input, result); } @Test public void mustBeAbleToUseConflate() throws Exception { - final JavaTestKit probe = new JavaTestKit(system); - final List input = Arrays.asList("A", "B", "C"); - Future future = Flow - .create(input) - .conflate(new Function() { - @Override - public String apply(String s) throws Exception { - return s; - } - }, - new Function2() { - @Override - public String apply(String in, String aggr) throws Exception { - return in; - } - } - ) - .fold("", new Function2() { - @Override - public String apply(String aggr, String in) throws Exception { - return in; - } - }) - .toFuture(materializer); - String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); - assertEquals("C", result); + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future future = Flow.create(input).conflate(new Function() { + @Override + public String apply(String s) throws Exception { + return s; + } + }, new Function2() { + @Override + public String apply(String in, String aggr) throws Exception { + return in; + } + }).fold("", new Function2() { + @Override + public String apply(String aggr, String in) throws Exception { + return in; + } + }).toFuture(materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("C", result); } @Test public void mustBeAbleToUseExpand() throws Exception { - final JavaTestKit probe = new JavaTestKit(system); - final List input = Arrays.asList("A", "B", "C"); - Future future = Flow - .create(input) - .expand(new Function() { - @Override - public String apply(String in) throws Exception { - return in; - } - }, - new Function>() { - @Override - public Pair apply(String in) throws Exception { - return new Pair(in, in); - } - } - ) - .toFuture(materializer); - String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); - assertEquals("A", result); + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future future = Flow.create(input).expand(new Function() { + @Override + public String apply(String in) throws Exception { + return in; + } + }, new Function>() { + @Override + public Pair apply(String in) throws Exception { + return new Pair(in, in); + } + }).toFuture(materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("A", result); } + @Test public void mustProduceTicks() throws Exception { final JavaTestKit probe = new JavaTestKit(system); @@ -511,14 +497,14 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); probe.expectMsgEquals("tick-1"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); probe.expectMsgEquals("tick-2"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); } - + @Test public void mustBeAbleToUseMapFuture() throws Exception { final JavaTestKit probe = new JavaTestKit(system); @@ -531,7 +517,7 @@ public class FlowTest { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } - }).consume(materializer); + }, materializer); probe.expectMsgEquals("A"); probe.expectMsgEquals("B"); probe.expectMsgEquals("C"); diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala index 04cd21dc00..d6e00efd1f 100644 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala @@ -69,9 +69,9 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "pull existing messages from a processor's journal" in { val streamProbe = TestProbe() - PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { + PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach({ case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) + }, materializer) 1 to numMessages foreach { i ⇒ streamProbe.expectMsg(s"a-${i}") @@ -80,9 +80,9 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "pull existing messages and new from a processor's journal" in { val streamProbe = TestProbe() - PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { + PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach({ case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) + }, materializer) 1 to numMessages foreach { i ⇒ streamProbe.expectMsg(s"a-${i}") @@ -98,9 +98,9 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val streamProbe = TestProbe() val fromSequenceNr = 5L - PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { + PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach({ case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) + }, materializer) fromSequenceNr to numMessages foreach { i ⇒ streamProbe.expectMsg(s"a-${i}") @@ -115,9 +115,9 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val publisher = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toPublisher(materializer) - Flow(publisher).foreach { + Flow(publisher).foreach({ case Persistent(payload, sequenceNr) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) + }, materializer) // let subscriber consume all existing messages 1 to numMessages foreach { i ⇒ @@ -125,9 +125,9 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", } // subscribe another subscriber - Flow(publisher).foreach { + Flow(publisher).foreach({ case Persistent(payload, sequenceNr) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) + }, materializer) // produce new messages and let both subscribers handle them 1 to 2 foreach { i ⇒ @@ -149,10 +149,10 @@ class PersistentPublisherSpec extends AkkaSpec(PersistenceSpec.config("leveldb", val publisher1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toPublisher(materializer) val publisher2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toPublisher(materializer) - Flow(publisher1).merge(publisher2).foreach { + Flow(publisher1).merge(publisher2).foreach({ case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) + }, materializer) 1 to numMessages foreach { i ⇒ if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}") diff --git a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala index 2aeb413d49..15166c983d 100644 --- a/akka-stream/src/test/scala/akka/stream/DuctSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/DuctSpec.scala @@ -103,11 +103,10 @@ class DuctSpec extends AkkaSpec { } "perform multiple transformation operations" in { - val duct = Duct[Int].map(_.toString).map("elem-" + _).foreach(testActor ! _) - val c = duct.consume(materializer) + val (in, fut) = Duct[Int].map(_.toString).map("elem-" + _).foreach(testActor ! _, materializer) val source = Flow(List(1, 2, 3)).toPublisher(materializer) - source.subscribe(c) + source.subscribe(in) expectMsg("elem-1") expectMsg("elem-2") diff --git a/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala index 146f5349c6..c868e7e2c2 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachSpec.scala @@ -6,28 +6,44 @@ package akka.stream import akka.stream.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } +import akka.stream.scaladsl.Flow +import akka.stream.testkit.StreamTestKit +import scala.util.control.NoStackTrace -class FlowForeachSpec extends AkkaSpec with ScriptedTest { +class FlowForeachSpec extends AkkaSpec { - val settings = MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 16, - initialFanOutBufferSize = 1, - maxFanOutBufferSize = 16, - dispatcher = "akka.test.stream-dispatcher") + val mat = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) + import system.dispatcher "A Foreach" must { - "foreach" in { - var count = 0 - def script = { - count = 0 - Script((1 to 50).toSeq -> Seq(())) + "call the procedure for each element" in { + Flow(1 to 3).foreach(testActor ! _, mat).onSuccess { + case _ ⇒ testActor ! "done" } - (1 to 50) foreach { _ ⇒ - runScript(script, settings)(_.foreach(x ⇒ count += x)) - count should be(25 * 51) + expectMsg(1) + expectMsg(2) + expectMsg(3) + expectMsg("done") + } + + "complete the future for an empty stream" in { + Flow(Nil).foreach(testActor ! _, mat).onSuccess { + case _ ⇒ testActor ! "done" } + expectMsg("done") + } + + "yield the first error" in { + val p = StreamTestKit.PublisherProbe[Int]() + Flow(p).foreach(testActor ! _, mat).onFailure { + case ex ⇒ testActor ! ex + } + val proc = p.expectSubscription + proc.expectRequest() + val ex = new RuntimeException("ex") with NoStackTrace + proc.sendError(ex) + expectMsg(ex) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index e18a3f6c88..3d945a4006 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -66,12 +66,13 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback after transform and foreach steps " in { val onCompleteProbe = TestProbe() val p = StreamTestKit.PublisherProbe[Int]() + import system.dispatcher // for the Future.onComplete Flow(p).map { x ⇒ onCompleteProbe.ref ! ("map-" + x) x - }.foreach { + }.foreach({ x ⇒ onCompleteProbe.ref ! ("foreach-" + x) - }.onComplete({ onCompleteProbe.ref ! _ }, materializer) + }, materializer).onComplete { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequest() proc.sendNext(42) diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala index 3b6c9e74bf..c8d233f388 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala @@ -27,8 +27,8 @@ class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\na Flow(List(1)).map(x ⇒ { testActor ! self; x }).toPublisher(materializer) }).take(3).foreach(x ⇒ { testActor ! self - Flow(x).foreach(_ ⇒ testActor ! self).consume(materializer) - }).toFuture(materializer) + Flow(x).foreach(_ ⇒ testActor ! self, materializer) + }, materializer) Await.result(f, 3.seconds) val refs = receiveWhile(idle = 250.millis) { case r: ActorRef ⇒ r diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala index 613bc674bd..a8e161d992 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -202,9 +202,9 @@ class TcpFlowSpec extends AkkaSpec { } def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): Future[Unit] = - Flow(bind(serverAddress).connectionStream).foreach { conn ⇒ + Flow(bind(serverAddress).connectionStream).foreach({ conn ⇒ conn.inputStream.subscribe(conn.outputStream) - }.toFuture(materializer) + }, materializer) "Outgoing TCP stream" must {