diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 1fec18ef35..4d8ab7837b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -4,11 +4,9 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference - import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import akka.stream.{ MaterializerSettings, Stop } +import akka.stream.{ MaterializerSettings } import org.reactivestreams.{ Publisher, Subscriber } - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.Duration diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index 9e3e80378f..4ef9b572d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -5,7 +5,6 @@ package akka.stream.impl import akka.actor.Props import akka.stream.MaterializerSettings -import akka.stream.Stop /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/Support.scala b/akka-stream/src/main/scala/akka/stream/impl/Stop.scala similarity index 59% rename from akka-stream/src/main/scala/akka/stream/Support.scala rename to akka-stream/src/main/scala/akka/stream/impl/Stop.scala index 8255da8580..55831f62fb 100644 --- a/akka-stream/src/main/scala/akka/stream/Support.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stop.scala @@ -1,18 +1,15 @@ /** * Copyright (C) 2014 Typesafe Inc. */ -package akka.stream +package akka.stream.impl -import scala.util.control.NoStackTrace +import scala.util.control.ControlThrowable /** + * INTERNAL API + * * This exception must be thrown from a callback-based stream publisher to * signal the end of stream (if the produced stream is not infinite). This is used for example in * [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure). */ -case object Stop extends RuntimeException("Stop this flow") with NoStackTrace { - /** - * Java API: get the singleton instance - */ - def getInstance = this -} +private[akka] case object Stop extends ControlThrowable diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5d3cdc8b66..88b1298596 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -10,7 +10,12 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success import org.reactivestreams.{ Publisher, Subscriber } -import akka.japi._ +import akka.japi.Creator +import akka.japi.Function +import akka.japi.Function2 +import akka.japi.Pair +import akka.japi.Predicate +import akka.japi.Procedure import akka.japi.Util.immutableSeq import akka.stream._ import akka.stream.scaladsl.{ Flow ⇒ SFlow } @@ -55,11 +60,10 @@ object Flow { /** * Define the sequence of elements to be produced by the given Callable. - * The stream ends normally when evaluation of the Callable results in - * a [[akka.stream.Stop]] exception being thrown; it ends exceptionally - * when any other exception is thrown. + * The stream ends normally when evaluation of the `Callable` returns a `null`. + * The stream ends exceptionally when an exception is thrown from the `Callable`. */ - def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() ⇒ block.call())) + def create[T](block: Callable[T]): Flow[T] = new FlowAdapter(SFlow.apply(() ⇒ Option(block.call()))) /** * Elements are produced from the tick `Callable` periodically with the specified interval. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1596123cac..13c5b57bed 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -14,6 +14,7 @@ import akka.stream.impl.FlowImpl import akka.stream.impl.Ast.TickPublisherNode import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration +import akka.stream.impl.Stop /** * Scala API @@ -46,11 +47,11 @@ object Flow { /** * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure results in - * a [[akka.stream.Stop]] exception being thrown; it ends exceptionally - * when any other exception is thrown. + * The stream ends normally when evaluation of the `Callable` returns a `None`. + * The stream ends exceptionally when an exception is thrown from the `Callable`. */ - def apply[T](f: () ⇒ T): Flow[T] = FlowImpl(ThunkPublisherNode(f), Nil) + def apply[T](f: () ⇒ Option[T]): Flow[T] = + FlowImpl(ThunkPublisherNode(() ⇒ f().getOrElse(throw Stop)), Nil) /** * Start a new `Flow` from the given `Future`. The stream will consist of diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 98777d4cfa..6bde628812 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -333,7 +333,7 @@ public class FlowTest { @Override public Integer call() { if (countdown == 0) - throw akka.stream.Stop.getInstance(); + return null; else { countdown -= 1; return countdown; @@ -358,7 +358,7 @@ public class FlowTest { Flow.create(input).onComplete(new OnCompleteCallback() { @Override public void onComplete(Throwable e) { - probe.getRef().tell( (e == null) ? "done" : e, ActorRef.noSender()); + probe.getRef().tell((e == null) ? "done" : e, ActorRef.noSender()); } }, materializer); @@ -490,11 +490,12 @@ public class FlowTest { return "tick-" + (count++); } }; - Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick).foreach(new Procedure() { - public void apply(String elem) { + Flow.create(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), tick) + .foreach(new Procedure() { + public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); - } - }, materializer); + } + }, materializer); probe.expectNoMsg(FiniteDuration.create(600, TimeUnit.MILLISECONDS)); probe.expectMsgEquals("tick-1"); probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index f5698fcd88..db16784bf0 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -320,11 +320,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "if an internal error occurs future subscribers' onError should be called instead of onSubscribed" in pending "be covariant" in { - val f1: Flow[Fruit] = Flow(() ⇒ new Apple) - val p1: Publisher[Fruit] = Flow(() ⇒ new Apple).toPublisher() - val f2: Flow[Publisher[Fruit]] = Flow(() ⇒ new Apple).splitWhen(_ ⇒ true) - val f3: Flow[(Boolean, Publisher[Fruit])] = Flow(() ⇒ new Apple).groupBy(_ ⇒ true) - val f4: Flow[(immutable.Seq[Apple], Publisher[Fruit])] = Flow(() ⇒ new Apple).prefixAndTail(1) + val f1: Flow[Fruit] = Flow(() ⇒ Some(new Apple)) + val p1: Publisher[Fruit] = Flow(() ⇒ Some(new Apple)).toPublisher() + val f2: Flow[Publisher[Fruit]] = Flow(() ⇒ Some(new Apple)).splitWhen(_ ⇒ true) + val f3: Flow[(Boolean, Publisher[Fruit])] = Flow(() ⇒ Some(new Apple)).groupBy(_ ⇒ true) + val f4: Flow[(immutable.Seq[Apple], Publisher[Fruit])] = Flow(() ⇒ Some(new Apple)).prefixAndTail(1) } }