diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 7d18a04dff..8d8396741b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -3,7 +3,9 @@ */ package akka.stream.scaladsl +import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.{ Success, Failure } import scala.util.control.NoStackTrace import akka.stream.ActorFlowMaterializer @@ -71,6 +73,62 @@ class SourceSpec extends AkkaSpec { } } + "Lazy Empty Source" must { + "complete materialized future when stream cancels" in { + val neverSource = Source.lazyEmpty() + val pubSink = Sink.publisher + + val mat = neverSource.to(pubSink).run() + + val f = mat.get(neverSource) + val neverPub = mat.get(pubSink) + + val c = StreamTestKit.SubscriberProbe() + neverPub.subscribe(c) + val subs = c.expectSubscription() + + subs.request(1000) + c.expectNoMsg(300.millis) + + subs.cancel() + Await.result(f.future, 300.millis) + } + + "allow external triggering of completion" in { + val neverSource = Source.lazyEmpty[Int]() + val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } + + val mat = neverSource.to(counterSink).run() + + val neverPromise = mat.get(neverSource) + val counterFuture = mat.get(counterSink) + + // external cancellation + neverPromise.success(()) + + val ready = Await.ready(counterFuture, 200.millis) + val Success(0) = ready.value.get + } + + "allow external triggering of onError" in { + val neverSource = Source.lazyEmpty() + val counterSink = Sink.fold[Int, Int](0) { (acc, _) ⇒ acc + 1 } + + val mat = neverSource.to(counterSink).run() + + val neverPromise = mat.get(neverSource) + val counterFuture = mat.get(counterSink) + + // external cancellation + neverPromise.failure(new Exception("Boom") with NoStackTrace) + + val ready = Await.ready(counterFuture, 200.millis) + val Failure(ex) = ready.value.get + ex.getMessage should include("Boom") + } + + } + "Source with additional keys" must { "materialize keys properly" in { val ks = Source.subscriber[Int] diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4176956e64..bc5c043798 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -7,7 +7,6 @@ import java.util.concurrent.Callable import akka.actor.{ Cancellable, ActorRef, Props } import akka.japi.Util import akka.stream._ -import akka.stream.scaladsl.PropsSource import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.annotation.unchecked.uncheckedVariance @@ -34,6 +33,18 @@ object Source { def empty[O](): Source[O] = new Source(scaladsl.Source.empty()) + /** + * Create a `Source` with no elements, which does not complete its downstream, + * until externally triggered to do so. + * + * It materializes a [[scala.concurrent.Promise]] which will be completed + * when the downstream stage of this source cancels. This promise can also + * be used to externally trigger completion, which the source then signalls + * to its downstream. + */ + def lazyEmpty[T]() = + new Source(scaladsl.Source.lazyEmpty()) + /** * Helper to create [[Source]] from `Publisher`. * @@ -188,7 +199,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * @tparam S materialized type of the given Sink */ def runWith[S](sink: KeyedSink[Out, S], materializer: FlowMaterializer): S = - asScala.runWith(sink.asScala)(materializer).asInstanceOf[S] + asScala.runWith(sink.asScala)(materializer) /** * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index 33902aba92..345076372d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -10,10 +10,12 @@ import akka.stream.impl._ import akka.stream.impl.Ast.AstNode import org.reactivestreams.Publisher import org.reactivestreams.Subscriber +import org.reactivestreams.{ Subscription, Publisher, Subscriber } + import scala.annotation.unchecked.uncheckedVariance import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ Promise, ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.util.{ Success, Failure } @@ -149,6 +151,37 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS } } +final case class LazyEmptySource[Out]() extends KeyedActorFlowSource[Out, Promise[Unit]] { + override def attach(flowSubscriber: Subscriber[Out], materializer: ActorFlowMaterializer, flowName: String) = { + val created = create(materializer, flowName) + created._1.subscribe(flowSubscriber) + created._2 + } + override def isActive: Boolean = true + override def create(materializer: ActorFlowMaterializer, flowName: String) = { + val p = Promise[Unit]() + + // Not TCK verified as RC1 does not allow "empty publishers", + // reactive-streams on master now contains support for empty publishers. + // so we can enable it then, though it will require external completing of the promise + val pub = new Publisher[Unit] { + override def subscribe(s: Subscriber[_ >: Unit]) = { + s.onSubscribe(new Subscription { + override def request(n: Long): Unit = () + + override def cancel(): Unit = p.success(()) + }) + p.future.onComplete { + case Success(_) ⇒ s.onComplete() + case Failure(ex) ⇒ s.onError(ex) // due to external signal + }(materializer.asInstanceOf[ActorFlowMaterializerImpl].executionContext) // TODO: Should it use this EC or something else? + } + } + + pub.asInstanceOf[Publisher[Out]] → p + } +} + /** * Elements are emitted periodically with the specified interval. * The tick element will be delivered to downstream consumers that has requested any elements. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 99112e3518..eef654f780 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -177,6 +177,17 @@ object Source { def empty[T](): Source[T] = _empty private[this] val _empty: Source[Nothing] = apply(EmptyPublisher) + /** + * Create a `Source` with no elements, which does not complete its downstream, + * until externally triggered to do so. + * + * It materializes a [[scala.concurrent.Promise]] which will be completed + * when the downstream stage of this source cancels. This promise can also + * be used to externally trigger completion, which the source then signalls + * to its downstream. + */ + def lazyEmpty[T]() = LazyEmptySource[T]() + /** * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. */