diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 445a1f57f5..cc71c622a9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -75,14 +75,10 @@ class TickSourceSpec extends AkkaSpec { "signal onError when tick closure throws" in { val c = StreamTestKit.SubscriberProbe[String]() - val tickSource = Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace) - val m = tickSource.to(Sink(c)).run() - val cancellable = m.get(tickSource) + Source[String](1.second, 1.second, () ⇒ throw new RuntimeException("tick err") with NoStackTrace).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(3) c.expectError.getMessage should be("tick err") - awaitCond(cancellable.isCancelled) - c.expectNoMsg(100.millis) } "be usable with zip for a simple form of rate limiting" in { @@ -105,25 +101,5 @@ class TickSourceSpec extends AkkaSpec { sub.cancel() } - "be possible to cancel" in { - val tickGen = Iterator from 1 - val c = StreamTestKit.SubscriberProbe[String]() - val tickSource = Source(1.second, 500.millis, () ⇒ "tick-" + tickGen.next()) - val m = tickSource.to(Sink(c)).run() - val cancellable = m.get(tickSource) - val sub = c.expectSubscription() - sub.request(3) - c.expectNoMsg(600.millis) - c.expectNext("tick-1") - c.expectNoMsg(200.millis) - c.expectNext("tick-2") - c.expectNoMsg(200.millis) - c.expectNext("tick-3") - cancellable.cancel() - awaitCond(cancellable.isCancelled) - sub.request(3) - c.expectComplete() - } - } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index dc95f9b1ce..82085f8711 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -6,19 +6,17 @@ package akka.stream.impl import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } import org.reactivestreams.{ Subscriber, Subscription } + import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal -import java.util.concurrent.atomic.AtomicBoolean /** * INTERNAL API */ private[akka] object TickPublisher { - def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, - cancelled: AtomicBoolean, settings: MaterializerSettings): Props = - Props(new TickPublisher(initialDelay, interval, tick, cancelled, settings)). - withDispatcher(settings.dispatcher) + def props(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings): Props = + Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher) object TickPublisherSubscription { case class Cancel(subscriber: Subscriber[_ >: Any]) @@ -44,14 +42,12 @@ private[akka] object TickPublisher { * Each subscriber will receive the tick element if it has requested any elements, * otherwise the tick element is dropped for that subscriber. */ -private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, - cancelled: AtomicBoolean, settings: MaterializerSettings) extends Actor with SoftShutdown { +private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Any, settings: MaterializerSettings) extends Actor with SoftShutdown { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher._ var exposedPublisher: ActorPublisher[Any] = _ val demand = mutable.Map.empty[Subscriber[_ >: Any], Long] - var failed = false override val supervisorStrategy = SupervisorStrategy.stoppingStrategy @@ -86,9 +82,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite } catch { case NonFatal(e) ⇒ // tick closure throwed => onError downstream - failed = true demand foreach { case (subscriber, _) ⇒ subscriber.onError(e) } - softShutdown() } case RequestMore(elements, subscriber) ⇒ @@ -123,13 +117,8 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite override def postStop(): Unit = { tickTask.foreach(_.cancel) - cancelled.set(true) if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) - if (!failed) - demand.foreach { - case (subscriber, _) ⇒ subscriber.onComplete() - } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4e799f84bf..d3135c7c1f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import java.util.concurrent.Callable + import akka.actor.ActorRef import akka.actor.Props import akka.japi.Util @@ -11,13 +12,13 @@ import akka.stream._ import akka.stream.scaladsl.PropsSource import org.reactivestreams.Publisher import org.reactivestreams.Subscriber + import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.language.higherKinds import scala.language.implicitConversions -import akka.actor.Cancellable /** Java API */ object Source { @@ -109,13 +110,9 @@ object Source { * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. - * - * The [[MaterializedMap]] will contain a [[akka.actor.Cancellable]] for this - * `KeyedSource` and that can be used for stopping the tick source and thereby - * completing the stream. */ - def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.KeyedSource[O, Cancellable] = - new KeyedSource(scaladsl.Source(initialDelay, interval, () ⇒ tick.call())) + def from[O](initialDelay: FiniteDuration, interval: FiniteDuration, tick: Callable[O]): javadsl.Source[O] = + new Source(scaladsl.Source(initialDelay, interval, () ⇒ tick.call())) /** * Creates a `Source` by using a [[FlowGraphBuilder]] from this [[PartialFlowGraph]] on a block that expects @@ -148,7 +145,7 @@ object Source { /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ - def subscriber[T](): KeyedSource[T, Subscriber[T]] = + def subscriber[T](): KeyedSource[Subscriber[T], T] = new KeyedSource(scaladsl.Source.subscriber) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index e14736b8af..b7711d7644 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -10,15 +10,13 @@ import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.impl.Ast.AstNode import org.reactivestreams.Publisher import org.reactivestreams.Subscriber + import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.util.Failure import scala.util.Success -import akka.actor.Cancellable -import akka.actor.PoisonPill -import java.util.concurrent.atomic.AtomicBoolean sealed trait ActorFlowSource[+Out] extends Source[Out] { @@ -183,34 +181,14 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. - * - * The [[MaterializedMap]] will contain a [[akka.actor.Cancellable]] for this - * `TickSource` and that can be used for stopping the tick source and thereby - * completing the stream. */ -final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends KeyedActorFlowSource[Out] { - override type MaterializedType = Cancellable - - override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Cancellable = { - val (pub, cancellable) = create(materializer, flowName) - pub.subscribe(flowSubscriber) - cancellable - } +final case class TickSource[Out](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ Out) extends SimpleActorFlowSource[Out] { + override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = + create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true - override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = { - val cancelled = new AtomicBoolean(false) - val ref = materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, - cancelled, materializer.settings), name = s"$flowName-0-tick") - val cancellable = new Cancellable { - override def cancel(): Boolean = { - if (!isCancelled) - ref ! PoisonPill - true - } - override def isCancelled: Boolean = cancelled.get - } - (ActorPublisher[Out](ref), cancellable) - } + override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = + (ActorPublisher[Out](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), + name = s"$flowName-0-tick")), ()) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 6429a9dca6..7d9b009c0d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -132,12 +132,8 @@ object Source { * If a consumer has not requested any elements at the point in time when the tick * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. - * - * The [[MaterializedMap]] will contain a [[akka.actor.Cancellable]] for this - * `TickSource` and that can be used for stopping the tick source and thereby - * completing the stream. */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): TickSource[T] = + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = TickSource(initialDelay, interval, tick) /**