parent
629383545f
commit
267d96ecf1
5 changed files with 23 additions and 413 deletions
|
|
@ -110,6 +110,11 @@ A full cluster restart is required to change to Artery.
|
|||
The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no longer wrapped in
|
||||
`Future`/`CompletionStage`. It can be sent as reply to `sender()` immediately without using the `pipe` pattern.
|
||||
|
||||
### Timing operator removed
|
||||
|
||||
`akka.stream.extra.Timing` has been removed. If you need it you can now find it in `akka.stream.contrib.Timed` from
|
||||
[Akka Stream Contrib](https://github.com/akka/akka-stream-contrib/blob/master/contrib/src/main/scala/akka/stream/contrib/Timed.scala).
|
||||
|
||||
## Cluster Sharding
|
||||
|
||||
### Passivate idle entity
|
||||
|
|
|
|||
|
|
@ -1,136 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.extra
|
||||
|
||||
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.scaladsl.StreamTestKit._
|
||||
import akka.testkit.TestProbe
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
class FlowTimedSpec extends StreamSpec with ScriptedTest {
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
"Timed Source" must {
|
||||
|
||||
import akka.stream.extra.Implicits.TimedFlowDsl
|
||||
|
||||
"measure time it between elements matching a predicate" in {
|
||||
val testActor = TestProbe()
|
||||
|
||||
val measureBetweenEvery = 5
|
||||
val printInfo = (interval: Duration) => {
|
||||
testActor.ref ! interval
|
||||
info(s"Measured interval between $measureBetweenEvery elements was: $interval")
|
||||
}
|
||||
|
||||
val n = 20
|
||||
val testRuns = 1 to 2
|
||||
|
||||
def script =
|
||||
Script((1 to n).map { x =>
|
||||
Seq(x) -> Seq(x)
|
||||
}: _*)
|
||||
testRuns.foreach(_ =>
|
||||
runScript(script, settings) { flow =>
|
||||
flow.map(identity).timedIntervalBetween(_ % measureBetweenEvery == 0, onInterval = printInfo)
|
||||
})
|
||||
|
||||
val expectedNrOfOnIntervalCalls = testRuns.size * ((n / measureBetweenEvery) - 1) // first time has no value to compare to, so skips calling onInterval
|
||||
(1 to expectedNrOfOnIntervalCalls).foreach { _ =>
|
||||
testActor.expectMsgType[Duration]
|
||||
}
|
||||
}
|
||||
|
||||
"measure time it takes from start to complete, by wrapping operations" in {
|
||||
val testActor = TestProbe()
|
||||
|
||||
val n = 50
|
||||
val printInfo = (d: FiniteDuration) => {
|
||||
testActor.ref ! d
|
||||
info(s"Processing $n elements took $d")
|
||||
}
|
||||
|
||||
val testRuns = 1 to 3
|
||||
|
||||
def script =
|
||||
Script((1 to n).map { x =>
|
||||
Seq(x) -> Seq(x)
|
||||
}: _*)
|
||||
testRuns.foreach(_ =>
|
||||
runScript(script, settings) { flow =>
|
||||
flow.timed(_.map(identity), onComplete = printInfo)
|
||||
})
|
||||
|
||||
testRuns.foreach { _ =>
|
||||
testActor.expectMsgType[Duration]
|
||||
}
|
||||
testActor.expectNoMsg(1.second)
|
||||
}
|
||||
|
||||
"have a Java API" in pending
|
||||
}
|
||||
|
||||
"Timed Flow" must {
|
||||
import akka.stream.extra.Implicits.TimedFlowDsl
|
||||
|
||||
"measure time it between elements matching a predicate" in assertAllStagesStopped {
|
||||
val probe = TestProbe()
|
||||
|
||||
val flow: Flow[Int, Long, _] = Flow[Int].map(_.toLong).timedIntervalBetween(in => in % 2 == 1, d => probe.ref ! d)
|
||||
|
||||
val c1 = TestSubscriber.manualProbe[Long]()
|
||||
Source(List(1, 2, 3)).via(flow).runWith(Sink.fromSubscriber(c1))
|
||||
|
||||
val s = c1.expectSubscription()
|
||||
s.request(100)
|
||||
c1.expectNext(1L)
|
||||
c1.expectNext(2L)
|
||||
c1.expectNext(3L)
|
||||
c1.expectComplete()
|
||||
|
||||
val duration = probe.expectMsgType[Duration]
|
||||
info(s"Got duration (first): $duration")
|
||||
}
|
||||
|
||||
"measure time from start to complete, by wrapping operations" in assertAllStagesStopped {
|
||||
val probe = TestProbe()
|
||||
|
||||
// making sure the types come out as expected
|
||||
val flow: Flow[Int, String, _] =
|
||||
Flow[Int].timed(_.map(_.toDouble).map(_.toInt).map(_.toString), duration => probe.ref ! duration).map {
|
||||
s: String =>
|
||||
s + "!"
|
||||
}
|
||||
|
||||
val (flowIn: Subscriber[Int], flowOut: Publisher[String]) =
|
||||
flow.runWith(Source.asSubscriber[Int], Sink.asPublisher[String](false))
|
||||
|
||||
val c1 = TestSubscriber.manualProbe[String]()
|
||||
val c2 = flowOut.subscribe(c1)
|
||||
|
||||
val p = Source(0 to 100).runWith(Sink.asPublisher(false))
|
||||
p.subscribe(flowIn)
|
||||
|
||||
val s = c1.expectSubscription()
|
||||
s.request(200)
|
||||
(0 to 100).foreach { i =>
|
||||
c1.expectNext(i.toString + "!")
|
||||
}
|
||||
c1.expectComplete()
|
||||
|
||||
val duration = probe.expectMsgType[Duration]
|
||||
info(s"Took: $duration")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -62,5 +62,23 @@ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowO
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWithGraph")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipLatestWith")
|
||||
|
||||
## 2.6
|
||||
|
||||
# #24372 No Future/CompletionStage in StreamRefs
|
||||
# FIXME why was change not detected?
|
||||
|
||||
# 26188 remove Timed
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedFlowContext")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StartTimed")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.TimedOps")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedFlowDsl")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedSourceDsl$")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.TimedIntervalBetweenOps")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedSourceDsl")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$TimedFlowDsl$")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Implicits$")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$TimedInterval")
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.extra.Timed$StopTimed")
|
||||
|
|
|
|||
|
|
@ -1,68 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.extra
|
||||
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.scaladsl.Source
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
/**
|
||||
* Additional [[akka.stream.scaladsl.Flow]] and [[akka.stream.scaladsl.Flow]] operators.
|
||||
*/
|
||||
object Implicits {
|
||||
|
||||
/**
|
||||
* Provides time measurement utilities on Stream elements.
|
||||
*
|
||||
* See [[Timed]]
|
||||
*/
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", "2.4.5") // overlooked this on the implicits when Timed was deprecated
|
||||
implicit class TimedSourceDsl[I, Mat](val source: Source[I, Mat]) extends AnyVal {
|
||||
|
||||
/**
|
||||
* Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`.
|
||||
*/
|
||||
@silent
|
||||
def timed[O, Mat2](
|
||||
measuredOps: Source[I, Mat] => Source[O, Mat2],
|
||||
onComplete: FiniteDuration => Unit): Source[O, Mat2] =
|
||||
Timed.timed[I, O, Mat, Mat2](source, measuredOps, onComplete)
|
||||
|
||||
/**
|
||||
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
|
||||
*/
|
||||
@silent
|
||||
def timedIntervalBetween(matching: I => Boolean, onInterval: FiniteDuration => Unit): Source[I, Mat] =
|
||||
Timed.timedIntervalBetween[I, Mat](source, matching, onInterval)
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides time measurement utilities on Stream elements.
|
||||
*
|
||||
* See [[Timed]]
|
||||
*/
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", "2.4.5") // overlooked this on the implicits when Timed was deprecated
|
||||
implicit class TimedFlowDsl[I, O, Mat](val flow: Flow[I, O, Mat]) extends AnyVal {
|
||||
|
||||
/**
|
||||
* Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`.
|
||||
*/
|
||||
@silent
|
||||
def timed[Out, Mat2](
|
||||
measuredOps: Flow[I, O, Mat] => Flow[I, Out, Mat2],
|
||||
onComplete: FiniteDuration => Unit): Flow[I, Out, Mat2] =
|
||||
Timed.timed[I, O, Out, Mat, Mat2](flow, measuredOps, onComplete)
|
||||
|
||||
/**
|
||||
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
|
||||
*/
|
||||
@silent
|
||||
def timedIntervalBetween(matching: O => Boolean, onInterval: FiniteDuration => Unit): Flow[I, O, Mat] =
|
||||
Timed.timedIntervalBetween[I, O, Mat](flow, matching, onInterval)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,209 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.extra
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
||||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import akka.stream.stage._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* Provides operations needed to implement the `timed` DSL
|
||||
*/
|
||||
private[akka] trait TimedOps {
|
||||
|
||||
import Timed._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`.
|
||||
*/
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5")
|
||||
def timed[I, O, Mat, Mat2](
|
||||
source: Source[I, Mat],
|
||||
measuredOps: Source[I, Mat] => Source[O, Mat2],
|
||||
onComplete: FiniteDuration => Unit): Source[O, Mat2] = {
|
||||
val ctx = new TimedFlowContext
|
||||
|
||||
val startTimed = Flow[I].via(new StartTimed(ctx)).named("startTimed")
|
||||
val stopTimed = Flow[O].via(new StopTimed(ctx, onComplete)).named("stopTimed")
|
||||
|
||||
measuredOps(source.via(startTimed)).via(stopTimed)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`.
|
||||
*/
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5")
|
||||
def timed[I, O, Out, Mat, Mat2](
|
||||
flow: Flow[I, O, Mat],
|
||||
measuredOps: Flow[I, O, Mat] => Flow[I, Out, Mat2],
|
||||
onComplete: FiniteDuration => Unit): Flow[I, Out, Mat2] = {
|
||||
// todo is there any other way to provide this for Flow, without duplicating impl?
|
||||
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
|
||||
val ctx = new TimedFlowContext
|
||||
|
||||
val startTimed = Flow[O].via(new StartTimed(ctx)).named("startTimed")
|
||||
val stopTimed = Flow[Out].via(new StopTimed(ctx, onComplete)).named("stopTimed")
|
||||
|
||||
measuredOps(flow.via(startTimed)).via(stopTimed)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Provides operations needed to implement the `timedIntervalBetween` DSL
|
||||
*/
|
||||
private[akka] trait TimedIntervalBetweenOps {
|
||||
|
||||
import Timed._
|
||||
|
||||
/**
|
||||
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
|
||||
*/
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5")
|
||||
def timedIntervalBetween[O, Mat](
|
||||
source: Source[O, Mat],
|
||||
matching: O => Boolean,
|
||||
onInterval: FiniteDuration => Unit): Source[O, Mat] = {
|
||||
val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval")
|
||||
source.via(timedInterval)
|
||||
}
|
||||
|
||||
/**
|
||||
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
|
||||
*/
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5")
|
||||
def timedIntervalBetween[I, O, Mat](
|
||||
flow: Flow[I, O, Mat],
|
||||
matching: O => Boolean,
|
||||
onInterval: FiniteDuration => Unit): Flow[I, O, Mat] = {
|
||||
val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval")
|
||||
flow.via(timedInterval)
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("Moved to the akka/akka-stream-contrib project", since = "2.4.5")
|
||||
object Timed extends TimedOps with TimedIntervalBetweenOps {
|
||||
|
||||
// todo needs java DSL
|
||||
|
||||
final class TimedFlowContext {
|
||||
import scala.concurrent.duration._
|
||||
|
||||
private val _start = new AtomicLong
|
||||
private val _stop = new AtomicLong
|
||||
|
||||
def start(): Unit = {
|
||||
_start.compareAndSet(0, System.nanoTime())
|
||||
}
|
||||
|
||||
def stop(): FiniteDuration = {
|
||||
_stop.compareAndSet(0, System.nanoTime())
|
||||
compareStartAndStop()
|
||||
}
|
||||
|
||||
private def compareStartAndStop(): FiniteDuration = {
|
||||
val stp = _stop.get
|
||||
if (stp <= 0) Duration.Zero
|
||||
else (stp - _start.get).nanos
|
||||
}
|
||||
}
|
||||
|
||||
final class StartTimed[T](timedContext: TimedFlowContext) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
|
||||
private var started = false
|
||||
|
||||
override def onPush(): Unit = {
|
||||
if (!started) {
|
||||
timedContext.start()
|
||||
started = true
|
||||
}
|
||||
push(out, grab(in))
|
||||
}
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration => Unit)
|
||||
extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
override def onUpstreamFailure(cause: Throwable): Unit = {
|
||||
stopTime()
|
||||
failStage(cause)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
stopTime()
|
||||
completeStage()
|
||||
}
|
||||
|
||||
private def stopTime(): Unit = {
|
||||
val d = timedContext.stop()
|
||||
_onComplete(d)
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
}
|
||||
|
||||
final class TimedInterval[T](matching: T => Boolean, onInterval: FiniteDuration => Unit)
|
||||
extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
|
||||
private var prevNanos = 0L
|
||||
private var matched = 0L
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
if (matching(elem)) {
|
||||
val d = updateInterval()
|
||||
|
||||
if (matched > 1)
|
||||
onInterval(d)
|
||||
}
|
||||
push(out, elem)
|
||||
}
|
||||
|
||||
override def onPull(): Unit = pull(in)
|
||||
|
||||
private def updateInterval(): FiniteDuration = {
|
||||
matched += 1
|
||||
val nowNanos = System.nanoTime()
|
||||
val d = nowNanos - prevNanos
|
||||
prevNanos = nowNanos
|
||||
d.nanoseconds
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue