diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index bbeb40cbec..483083388c 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -4,10 +4,14 @@ package akka.stream.extra import java.util.concurrent.atomic.AtomicLong + +import akka.stream.Attributes +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.scaladsl.{ Flow, Source } +import akka.stream.stage._ + import scala.concurrent.duration._ import scala.language.existentials -import akka.stream.scaladsl.{ Source, Flow } -import akka.stream.stage._ /** * Provides operations needed to implement the `timed` DSL @@ -24,8 +28,8 @@ private[akka] trait TimedOps { def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] ⇒ Source[O, Mat2], onComplete: FiniteDuration ⇒ Unit): Source[O, Mat2] = { val ctx = new TimedFlowContext - val startTimed = Flow[I].transform(() ⇒ new StartTimed(ctx)).named("startTimed") - val stopTimed = Flow[O].transform(() ⇒ new StopTimed(ctx, onComplete)).named("stopTimed") + val startTimed = Flow[I].via(new StartTimed(ctx)).named("startTimed") + val stopTimed = Flow[O].via(new StopTimed(ctx, onComplete)).named("stopTimed") measuredOps(source.via(startTimed)).via(stopTimed) } @@ -40,8 +44,8 @@ private[akka] trait TimedOps { // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type val ctx = new TimedFlowContext - val startTimed = Flow[O].transform(() ⇒ new StartTimed(ctx)).named("startTimed") - val stopTimed = Flow[Out].transform(() ⇒ new StopTimed(ctx, onComplete)).named("stopTimed") + val startTimed = Flow[O].via(new StartTimed(ctx)).named("startTimed") + val stopTimed = Flow[Out].via(new StopTimed(ctx, onComplete)).named("stopTimed") measuredOps(flow.via(startTimed)).via(stopTimed) } @@ -61,7 +65,7 @@ private[akka] trait TimedIntervalBetweenOps { * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Source[O, Mat] = { - val timedInterval = Flow[O].transform(() ⇒ new TimedInterval[O](matching, onInterval)).named("timedInterval") + val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval") source.via(timedInterval) } @@ -69,7 +73,7 @@ private[akka] trait TimedIntervalBetweenOps { * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Flow[I, O, Mat] = { - val timedInterval = Flow[O].transform(() ⇒ new TimedInterval[O](matching, onInterval)).named("timedInterval") + val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval") flow.via(timedInterval) } } @@ -100,58 +104,84 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { } } - final class StartTimed[T](timedContext: TimedFlowContext) extends PushStage[T, T] { - private var started = false + final class StartTimed[T](timedContext: TimedFlowContext) extends SimpleLinearGraphStage[T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - if (!started) { - timedContext.start() - started = true + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + + private var started = false + + override def onPush(): Unit = { + if (!started) { + timedContext.start() + started = true + } + push(out, grab(in)) } - ctx.push(elem) + + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) } } - final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration ⇒ Unit) extends PushStage[T, T] { + final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration ⇒ Unit) extends SimpleLinearGraphStage[T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - stopTime() - ctx.fail(cause) - } - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - stopTime() - ctx.finish() - } - private def stopTime() { - val d = timedContext.stop() - _onComplete(d) - } + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + + override def onUpstreamFailure(cause: Throwable): Unit = { + stopTime() + failStage(cause) + } + + override def onUpstreamFinish(): Unit = { + stopTime() + completeStage() + } + + private def stopTime() { + val d = timedContext.stop() + _onComplete(d) + } + + setHandlers(in, out, this) + } } - final class TimedInterval[T](matching: T ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit) extends PushStage[T, T] { - private var prevNanos = 0L - private var matched = 0L + final class TimedInterval[T](matching: T ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit) extends SimpleLinearGraphStage[T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - if (matching(elem)) { - val d = updateInterval(elem) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - if (matched > 1) - onInterval(d) + private var prevNanos = 0L + private var matched = 0L + + override def onPush(): Unit = { + val elem = grab(in) + if (matching(elem)) { + val d = updateInterval(elem) + + if (matched > 1) + onInterval(d) + } + push(out, elem) } - ctx.push(elem) + + override def onPull(): Unit = pull(in) + + private def updateInterval(in: T): FiniteDuration = { + matched += 1 + val nowNanos = System.nanoTime() + val d = nowNanos - prevNanos + prevNanos = nowNanos + d.nanoseconds + } + + setHandlers(in, out, this) } - private def updateInterval(in: T): FiniteDuration = { - matched += 1 - val nowNanos = System.nanoTime() - val d = nowNanos - prevNanos - prevNanos = nowNanos - d.nanoseconds - } } } diff --git a/project/MiMa.scala b/project/MiMa.scala index 077be3133b..5da02c171f 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -787,7 +787,17 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.serialization.MessageFormats#PersistentStateChangeEventOrBuilder.hasTimeoutNanos"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.saveStateSnapshot"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout_=") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout_="), + + // #19834 + ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$StartTimed"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StartTimed.onPush"), + ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$TimedInterval"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#TimedInterval.onPush"), + ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$StopTimed"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onPush"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFinish"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFailure") ) ) }