diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index cf6186dcc0..020c86a1cf 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -110,6 +110,11 @@ A full cluster restart is required to change to Artery. The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in `Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern. +### Timing operator removed + +`akka.stream.extra.Timing` has been removed. If you need it you can now find it in `akka.stream.contrib.Timed` from + [Akka Stream Contrib](https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala). + ## Cluster Sharding ### Passivate idle entity diff --git a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala deleted file mode 100644 index 19e6f7caf3..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/extra/FlowTimedSpec.scala +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.stream.extra - -import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } -import akka.stream.scaladsl.{ Flow, Source } -import akka.stream.scaladsl.Sink -import akka.stream.testkit._ -import akka.stream.testkit.scaladsl.StreamTestKit._ -import akka.testkit.TestProbe -import org.reactivestreams.{ Publisher, Subscriber } - -class FlowTimedSpec extends StreamSpec with ScriptedTest { - - import scala.concurrent.duration._ - - val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 2, maxSize = 16) - - implicit val materializer = ActorMaterializer(settings) - - "Timed Source" must { - - import akka.stream.extra.Implicits.TimedFlowDsl - - "measure time it between elements matching a predicate" in { - val testActor = TestProbe() - - val measureBetweenEvery = 5 - val printInfo = (interval: Duration) => { - testActor.ref ! interval - info(s"Measured interval between $measureBetweenEvery elements was: $interval") - } - - val n = 20 - val testRuns = 1 to 2 - - def script = - Script((1 to n).map { x => - Seq(x) -> Seq(x) - }: _*) - testRuns.foreach(_ => - runScript(script, settings) { flow => - flow.map(identity).timedIntervalBetween(_ % measureBetweenEvery == 0, onInterval = printInfo) - }) - - val expectedNrOfOnIntervalCalls = testRuns.size * ((n / measureBetweenEvery) - 1) // first time has no value to compare to, so skips calling onInterval - (1 to expectedNrOfOnIntervalCalls).foreach { _ => - testActor.expectMsgType[Duration] - } - } - - "measure time it takes from start to complete, by wrapping operations" in { - val testActor = TestProbe() - - val n = 50 - val printInfo = (d: FiniteDuration) => { - testActor.ref ! d - info(s"Processing $n elements took $d") - } - - val testRuns = 1 to 3 - - def script = - Script((1 to n).map { x => - Seq(x) -> Seq(x) - }: _*) - testRuns.foreach(_ => - runScript(script, settings) { flow => - flow.timed(_.map(identity), onComplete = printInfo) - }) - - testRuns.foreach { _ => - testActor.expectMsgType[Duration] - } - testActor.expectNoMsg(1.second) - } - - "have a Java API" in pending - } - - "Timed Flow" must { - import akka.stream.extra.Implicits.TimedFlowDsl - - "measure time it between elements matching a predicate" in assertAllStagesStopped { - val probe = TestProbe() - - val flow: Flow[Int, Long, _] = Flow[Int].map(_.toLong).timedIntervalBetween(in => in % 2 == 1, d => probe.ref ! d) - - val c1 = TestSubscriber.manualProbe[Long]() - Source(List(1, 2, 3)).via(flow).runWith(Sink.fromSubscriber(c1)) - - val s = c1.expectSubscription() - s.request(100) - c1.expectNext(1L) - c1.expectNext(2L) - c1.expectNext(3L) - c1.expectComplete() - - val duration = probe.expectMsgType[Duration] - info(s"Got duration (first): $duration") - } - - "measure time from start to complete, by wrapping operations" in assertAllStagesStopped { - val probe = TestProbe() - - // making sure the types come out as expected - val flow: Flow[Int, String, _] = - Flow[Int].timed(_.map(_.toDouble).map(_.toInt).map(_.toString), duration => probe.ref ! duration).map { - s: String => - s + "!" - } - - val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = - flow.runWith(Source.asSubscriber[Int], Sink.asPublisher[String](false)) - - val c1 = TestSubscriber.manualProbe[String]() - val c2 = flowOut.subscribe(c1) - - val p = Source(0 to 100).runWith(Sink.asPublisher(false)) - p.subscribe(flowIn) - - val s = c1.expectSubscription() - s.request(200) - (0 to 100).foreach { i => - c1.expectNext(i.toString + "!") - } - c1.expectComplete() - - val duration = probe.expectMsgType[Duration] - info(s"Took: $duration") - } - } - -} diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index c8214c2b69..2a3f7df4ca 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -62,5 +62,23 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith") +## 2.6 + # #24372 No Future/CompletionStage in StreamRefs # FIXME why was change not detected? + +# 26188 remove Timed +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedFlowContext") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StartTimed") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.TimedOps") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedFlowDsl") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedSourceDsl$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.TimedIntervalBetweenOps") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedSourceDsl") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedFlowDsl$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedInterval") +ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StopTimed") diff --git a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala deleted file mode 100644 index 53e60da05b..0000000000 --- a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.stream.extra - -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Source -import com.github.ghik.silencer.silent - -import scala.concurrent.duration.FiniteDuration - -/** - * Additional [[akka.stream.scaladsl.Flow]] and [[akka.stream.scaladsl.Flow]] operators. - */ -object Implicits { - - /** - * Provides time measurement utilities on Stream elements. - * - * See [[Timed]] - */ - @deprecated("Moved to the akka/akka-stream-contrib project", "2.4.5") // overlooked this on the implicits when Timed was deprecated - implicit class TimedSourceDsl[I, Mat](val source: Source[I, Mat]) extends AnyVal { - - /** - * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. - */ - @silent - def timed[O, Mat2]( - measuredOps: Source[I, Mat] => Source[O, Mat2], - onComplete: FiniteDuration => Unit): Source[O, Mat2] = - Timed.timed[I, O, Mat, Mat2](source, measuredOps, onComplete) - - /** - * Measures rolling interval between immediately subsequent `matching(o: O)` elements. - */ - @silent - def timedIntervalBetween(matching: I => Boolean, onInterval: FiniteDuration => Unit): Source[I, Mat] = - Timed.timedIntervalBetween[I, Mat](source, matching, onInterval) - } - - /** - * Provides time measurement utilities on Stream elements. - * - * See [[Timed]] - */ - @deprecated("Moved to the akka/akka-stream-contrib project", "2.4.5") // overlooked this on the implicits when Timed was deprecated - implicit class TimedFlowDsl[I, O, Mat](val flow: Flow[I, O, Mat]) extends AnyVal { - - /** - * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. - */ - @silent - def timed[Out, Mat2]( - measuredOps: Flow[I, O, Mat] => Flow[I, Out, Mat2], - onComplete: FiniteDuration => Unit): Flow[I, Out, Mat2] = - Timed.timed[I, O, Out, Mat, Mat2](flow, measuredOps, onComplete) - - /** - * Measures rolling interval between immediately subsequent `matching(o: O)` elements. - */ - @silent - def timedIntervalBetween(matching: O => Boolean, onInterval: FiniteDuration => Unit): Flow[I, O, Mat] = - Timed.timedIntervalBetween[I, O, Mat](flow, matching, onInterval) - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala deleted file mode 100644 index 1f860eb254..0000000000 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.stream.extra - -import java.util.concurrent.atomic.AtomicLong - -import akka.stream.Attributes -import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.scaladsl.{ Flow, Source } -import akka.stream.stage._ - -import scala.concurrent.duration._ - -/** - * Provides operations needed to implement the `timed` DSL - */ -private[akka] trait TimedOps { - - import Timed._ - - /** - * INTERNAL API - * - * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. - */ - @deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5") - def timed[I, O, Mat, Mat2]( - source: Source[I, Mat], - measuredOps: Source[I, Mat] => Source[O, Mat2], - onComplete: FiniteDuration => Unit): Source[O, Mat2] = { - val ctx = new TimedFlowContext - - val startTimed = Flow[I].via(new StartTimed(ctx)).named("startTimed") - val stopTimed = Flow[O].via(new StopTimed(ctx, onComplete)).named("stopTimed") - - measuredOps(source.via(startTimed)).via(stopTimed) - } - - /** - * INTERNAL API - * - * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. - */ - @deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5") - def timed[I, O, Out, Mat, Mat2]( - flow: Flow[I, O, Mat], - measuredOps: Flow[I, O, Mat] => Flow[I, Out, Mat2], - onComplete: FiniteDuration => Unit): Flow[I, Out, Mat2] = { - // todo is there any other way to provide this for Flow, without duplicating impl? - // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type - val ctx = new TimedFlowContext - - val startTimed = Flow[O].via(new StartTimed(ctx)).named("startTimed") - val stopTimed = Flow[Out].via(new StopTimed(ctx, onComplete)).named("stopTimed") - - measuredOps(flow.via(startTimed)).via(stopTimed) - } - -} - -/** - * INTERNAL API - * - * Provides operations needed to implement the `timedIntervalBetween` DSL - */ -private[akka] trait TimedIntervalBetweenOps { - - import Timed._ - - /** - * Measures rolling interval between immediately subsequent `matching(o: O)` elements. - */ - @deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5") - def timedIntervalBetween[O, Mat]( - source: Source[O, Mat], - matching: O => Boolean, - onInterval: FiniteDuration => Unit): Source[O, Mat] = { - val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval") - source.via(timedInterval) - } - - /** - * Measures rolling interval between immediately subsequent `matching(o: O)` elements. - */ - @deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5") - def timedIntervalBetween[I, O, Mat]( - flow: Flow[I, O, Mat], - matching: O => Boolean, - onInterval: FiniteDuration => Unit): Flow[I, O, Mat] = { - val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval") - flow.via(timedInterval) - } -} - -@deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5") -object Timed extends TimedOps with TimedIntervalBetweenOps { - - // todo needs java DSL - - final class TimedFlowContext { - import scala.concurrent.duration._ - - private val _start = new AtomicLong - private val _stop = new AtomicLong - - def start(): Unit = { - _start.compareAndSet(0, System.nanoTime()) - } - - def stop(): FiniteDuration = { - _stop.compareAndSet(0, System.nanoTime()) - compareStartAndStop() - } - - private def compareStartAndStop(): FiniteDuration = { - val stp = _stop.get - if (stp <= 0) Duration.Zero - else (stp - _start.get).nanos - } - } - - final class StartTimed[T](timedContext: TimedFlowContext) extends SimpleLinearGraphStage[T] { - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { - - private var started = false - - override def onPush(): Unit = { - if (!started) { - timedContext.start() - started = true - } - push(out, grab(in)) - } - - override def onPull(): Unit = pull(in) - - setHandlers(in, out, this) - } - } - - final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration => Unit) - extends SimpleLinearGraphStage[T] { - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { - - override def onPush(): Unit = push(out, grab(in)) - - override def onPull(): Unit = pull(in) - - override def onUpstreamFailure(cause: Throwable): Unit = { - stopTime() - failStage(cause) - } - - override def onUpstreamFinish(): Unit = { - stopTime() - completeStage() - } - - private def stopTime(): Unit = { - val d = timedContext.stop() - _onComplete(d) - } - - setHandlers(in, out, this) - } - } - - final class TimedInterval[T](matching: T => Boolean, onInterval: FiniteDuration => Unit) - extends SimpleLinearGraphStage[T] { - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler { - - private var prevNanos = 0L - private var matched = 0L - - override def onPush(): Unit = { - val elem = grab(in) - if (matching(elem)) { - val d = updateInterval() - - if (matched > 1) - onInterval(d) - } - push(out, elem) - } - - override def onPull(): Unit = pull(in) - - private def updateInterval(): FiniteDuration = { - matched += 1 - val nowNanos = System.nanoTime() - val d = nowNanos - prevNanos - prevNanos = nowNanos - d.nanoseconds - } - - setHandlers(in, out, this) - } - - } - -}