diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index cc71c622a9..7012bbc0d6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -49,7 +49,7 @@ class TickSourceSpec extends AkkaSpec { c.expectNoMsg(200.millis) } - "produce ticks with multiple subscribers" in { + "reject multiple subscribers, but keep the first" in { val tickGen = Iterator from 1 val p = Source(1.second, 1.second, () ⇒ "tick-" + tickGen.next()).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[String]() @@ -57,28 +57,24 @@ class TickSourceSpec extends AkkaSpec { p.subscribe(c1) p.subscribe(c2) val sub1 = c1.expectSubscription() - val sub2 = c2.expectSubscription() + c2.expectError() sub1.request(1) - sub2.request(2) c1.expectNext("tick-1") - c2.expectNext("tick-1") - c2.expectNoMsg(200.millis) - c2.expectNext("tick-2") c1.expectNoMsg(200.millis) sub1.request(2) - sub2.request(2) - c1.expectNext("tick-3") - c2.expectNext("tick-3") + c1.expectNext("tick-2") sub1.cancel() - sub2.cancel() } "signal onError when tick closure throws" in { val c = StreamTestKit.SubscriberProbe[String]() - Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).to(Sink(c)).run() + val tickSource = Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace) + val m = tickSource.to(Sink(c)).run() + val cancellable = m.get(tickSource) val sub = c.expectSubscription() sub.request(3) c.expectError.getMessage should be("tick err") + awaitCond(cancellable.isCancelled) } "be usable with zip for a simple form of rate limiting" in { @@ -101,5 +97,25 @@ class TickSourceSpec extends AkkaSpec { sub.cancel() } + "be possible to cancel" in { + val tickGen = Iterator from 1 + val c = StreamTestKit.SubscriberProbe[String]() + val tickSource = Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()) + val m = tickSource.to(Sink(c)).run() + val cancellable = m.get(tickSource) + val sub = c.expectSubscription() + sub.request(3) + c.expectNoMsg(600.millis) + c.expectNext("tick-1") + c.expectNoMsg(200.millis) + c.expectNext("tick-2") + c.expectNoMsg(200.millis) + c.expectNext("tick-3") + cancellable.cancel() + awaitCond(cancellable.isCancelled) + sub.request(3) + c.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 3a40be54fb..a6f21015d2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.AtomicBoolean + import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } import akka.stream.MaterializerSettings import org.reactivestreams.{ Subscriber, Subscription } @@ -15,20 +17,21 @@ import scala.util.control.NonFatal * INTERNAL API */ private[akka] object TickPublisher { - def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings): Props = - Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher) + def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, + settings: MaterializerSettings, cancelled: AtomicBoolean): Props = + Props(new TickPublisher(initialDelay, interval, tick, settings, cancelled)).withDispatcher(settings.dispatcher) object TickPublisherSubscription { - case class Cancel(subscriber: Subscriber[_ >: Any]) - case class RequestMore(elements: Long, subscriber: Subscriber[_ >: Any]) + case object Cancel + case class RequestMore(elements: Long) } - class TickPublisherSubscription(ref: ActorRef, subscriber: Subscriber[_ >: Any]) extends Subscription { + class TickPublisherSubscription(ref: ActorRef) extends Subscription { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ - def cancel(): Unit = ref ! Cancel(subscriber) + def cancel(): Unit = ref ! Cancel def request(elements: Long): Unit = if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsCompliance.NumberOfElementsInRequestMustBePositiveMsg) - else ref ! RequestMore(elements, subscriber) + else ref ! RequestMore(elements) override def toString = "TickPublisherSubscription" } @@ -38,16 +41,18 @@ private[akka] object TickPublisher { /** * INTERNAL API * - * Elements are produced from the tick closure periodically with the specified interval. - * Each subscriber will receive the tick element if it has requested any elements, - * otherwise the tick element is dropped for that subscriber. + * Elements are produced from the tick closure periodically with the specified interval. Supports only one subscriber. + * The subscriber will receive the tick element if it has requested any elements, + * otherwise the tick element is dropped. */ -private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings) extends Actor with SoftShutdown { +private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, + settings: MaterializerSettings, cancelled: AtomicBoolean) extends Actor with SoftShutdown { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher._ var exposedPublisher: ActorPublisher[Any] = _ - val demand = mutable.Map.empty[Subscriber[_ >: Any], Long] + private var subscriber: Subscriber[_ >: Any] = null + private var demand: Long = 0 override val supervisorStrategy = SupervisorStrategy.stoppingStrategy @@ -72,51 +77,51 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite case Tick ⇒ try { val tickElement = tick() - demand foreach { - case (subscriber, d) ⇒ - if (d > 0) { - demand(subscriber) = d - 1 - subscriber.onNext(tickElement) - } + if (demand > 0) { + demand -= 1 + subscriber.onNext(tickElement) } } catch { case NonFatal(e) ⇒ - // tick closure throwed => onError downstream - demand foreach { case (subscriber, _) ⇒ subscriber.onError(e) } + if (subscriber ne null) { + subscriber.onError(e) + subscriber = null + } + exposedPublisher.shutdown(Some(e)) + context.stop(self) } - case RequestMore(elements, subscriber) ⇒ - demand.get(subscriber) match { - case Some(d) ⇒ demand(subscriber) = d + elements - case None ⇒ // canceled + case RequestMore(elements) ⇒ + demand += elements + if (demand < 0) { + // Long has overflown, reactive-streams specification rule 3.17 + exposedPublisher.shutdown(Some( + new IllegalStateException(ReactiveStreamsCompliance.TotalPendingDemandMustNotExceedLongMaxValue))) + context.stop(self) } - case Cancel(subscriber) ⇒ unregisterSubscriber(subscriber) + + case Cancel ⇒ + subscriber = null + context.stop(self) case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach registerSubscriber } - def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = { - if (demand.contains(subscriber)) - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + def registerSubscriber(s: Subscriber[_ >: Any]): Unit = { + if (subscriber ne null) s.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsCompliance.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else { - val subscription = new TickPublisherSubscription(self, subscriber) - demand(subscriber) = 0 + val subscription = new TickPublisherSubscription(self) + subscriber = s subscriber.onSubscribe(subscription) } } - private def unregisterSubscriber(subscriber: Subscriber[_ >: Any]): Unit = { - demand -= subscriber - if (demand.isEmpty) { - exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) - softShutdown() - } - } - override def postStop(): Unit = { tickTask.foreach(_.cancel) + cancelled.set(true) + if (subscriber ne null) subscriber.onComplete() if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index cac0278108..8a5a33151d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -3,7 +3,9 @@ */ package akka.stream.scaladsl -import akka.actor.{ Props, ActorRef } +import java.util.concurrent.atomic.AtomicBoolean + +import akka.actor.{ PoisonPill, Cancellable, Props, ActorRef } import akka.stream.impl._ import akka.stream.impl.Ast.AstNode import org.reactivestreams.Publisher @@ -155,33 +157,29 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ -final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends SimpleActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors? - override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = - create(materializer, flowName)._1.subscribe(flowSubscriber) - override def isActive: Boolean = true - override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = - (ActorPublisher[Out](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), - name = s"$flowName-0-tick")), ()) -} - -/** - * This Source takes two Sources and concatenates them together by draining the elements coming from the first Source - * completely, then draining the elements arriving from the second Source. If the first Source is infinite then the - * second Source will be never drained. - */ -final case class ConcatSource[Out](source1: Source[Out], source2: Source[Out]) extends SimpleActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors? +final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends KeyedActorFlowSource[Out] { // FIXME Why does this have anything to do with Actors? + override type MaterializedType = Cancellable override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = { - val concatter = Concat[Out] - val concatGraph = FlowGraph { builder ⇒ - builder - .addEdge(source1, Pipe.empty[Out], concatter.first) - .addEdge(source2, Pipe.empty[Out], concatter.second) - .addEdge(concatter.out, Sink(flowSubscriber)) - }.run()(materializer) + val (pub, cancellable) = create(materializer, flowName) + pub.subscribe(flowSubscriber) + cancellable } - override def isActive: Boolean = false + override def isActive: Boolean = true + override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { + val cancelled = new AtomicBoolean(false) + val ref = + materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings, cancelled), + name = s"$flowName-0-tick") + (ActorPublisher[Out](ref), new Cancellable { + override def cancel(): Boolean = { + if (!isCancelled) ref ! PoisonPill + true + } + override def isCancelled: Boolean = cancelled.get() + }) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index f0f8680d15..6318497b38 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -130,7 +130,7 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = // FIXME why is tick () => T and not T? + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): TickSource[T] = TickSource(initialDelay, interval, tick) /** @@ -181,7 +181,16 @@ object Source { * emitted by the second source is emitted after the last element of the first * source. */ - def concat[T](source1: Source[T], source2: Source[T]): Source[T] = ConcatSource(source1, source2) + def concat[T](source1: Source[T], source2: Source[T]): Source[T] = { + val output = UndefinedSink[T] + val concat = Concat[T] + Source() { b ⇒ + b.addEdge(source1, Pipe.empty[T], concat.first) + .addEdge(source2, Pipe.empty[T], concat.second) + .addEdge(concat.out, Pipe.empty[T], output) + output + } + } /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]