diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala index 6c545da23f..7c817e5faf 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/TickSourceSpec.scala @@ -101,5 +101,37 @@ class TickSourceSpec extends AkkaSpec { c.expectComplete() } + "acknowledge cancellation only once" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[String]() + val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(2) + c.expectNext("tick") + cancellable.cancel() should be(true) + cancellable.cancel() should be(false) + c.expectComplete() + } + + "have isCancelled mirror the cancellation state" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[String]() + val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run() + val sub = c.expectSubscription() + sub.request(2) + c.expectNext("tick") + cancellable.isCancelled should be(false) + cancellable.cancel() should be(true) + cancellable.isCancelled should be(true) + c.expectComplete() + } + + "support being cancelled immediately after its materialization" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[String]() + val cancellable = Source.tick(1.second, 500.millis, "tick").to(Sink.fromSubscriber(c)).run() + cancellable.cancel() should be(true) + val sub = c.expectSubscription() + sub.request(2) + c.expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index c02fd1d3c6..92cb4ed69d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -4,7 +4,8 @@ package akka.stream.impl.fusing import akka.Done -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } + import akka.actor.Cancellable import akka.dispatch.ExecutionContexts import akka.event.Logging @@ -12,6 +13,7 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage._ + import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import akka.stream.impl.StreamLayout._ @@ -254,52 +256,42 @@ object GraphStages { def terminationWatcher[T]: GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] = TerminationWatcher.asInstanceOf[GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]]] - private object TickSource { - class TickSourceCancellable(cancelled: AtomicBoolean) extends Cancellable { - private val cancelPromise = Promise[Done]() - - def cancelFuture: Future[Done] = cancelPromise.future - - override def cancel(): Boolean = { - if (!isCancelled) cancelPromise.trySuccess(Done) - true - } - - override def isCancelled: Boolean = cancelled.get() - } - } - final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { override val shape = SourceShape(Outlet[T]("TickSource.out")) val out = shape.out override def initialAttributes: Attributes = DefaultAttributes.tickSource override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Cancellable) = { - import TickSource._ - val cancelled = new AtomicBoolean(false) - val cancellable = new TickSourceCancellable(cancelled) + val logic = new TimerGraphStageLogic(shape) with Cancellable { + val cancelled = new AtomicBoolean(false) + val cancelCallback: AtomicReference[Option[AsyncCallback[Unit]]] = new AtomicReference(None) - val logic = new TimerGraphStageLogic(shape) { override def preStart() = { - schedulePeriodicallyWithInitialDelay("TickTimer", initialDelay, interval) - val callback = getAsyncCallback[Unit]((_) ⇒ { + cancelCallback.set(Some(getAsyncCallback[Unit](_ ⇒ completeStage()))) + if (cancelled.get) completeStage() - cancelled.set(true) - }) - - cancellable.cancelFuture.onComplete(_ ⇒ callback.invoke(()))(interpreter.materializer.executionContext) + else + schedulePeriodicallyWithInitialDelay("TickTimer", initialDelay, interval) } setHandler(out, eagerTerminateOutput) override protected def onTimer(timerKey: Any) = - if (isAvailable(out)) push(out, tick) + if (isAvailable(out) && !isCancelled) push(out, tick) + + override def cancel() = { + val success = !cancelled.getAndSet(true) + if (success) cancelCallback.get.foreach(_.invoke(())) + success + } + + override def isCancelled = cancelled.get override def toString: String = "TickSourceLogic" } - (logic, cancellable) + (logic, logic) } override def toString: String = s"TickSource($initialDelay, $interval, $tick)" diff --git a/project/MiMa.scala b/project/MiMa.scala index 48b51b322b..2af118b654 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -750,7 +750,11 @@ object MiMa extends AutoPlugin { // #19828 ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), - ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete") + ProblemFilters.exclude[ReversedAbstractMethodProblem]("akka.persistence.Eventsourced#ProcessingState.onWriteMessageComplete"), + + // #20028 Simplify TickSource cancellation + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$TickSourceCancellable"), + ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.fusing.GraphStages$TickSource$") ) ) }