=str rewrite StartTimed, StopTimed & TimedInterval as GraphStage

This commit is contained in:
Nafer Sanabria 2016-04-15 06:52:55 -05:00
parent 14320f4801
commit 17b419ce79
2 changed files with 86 additions and 46 deletions

View file

@ -4,10 +4,14 @@
package akka.stream.extra
import java.util.concurrent.atomic.AtomicLong
import akka.stream.Attributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.stage._
import scala.concurrent.duration._
import scala.language.existentials
import akka.stream.scaladsl.{ Source, Flow }
import akka.stream.stage._
/**
* Provides operations needed to implement the `timed` DSL
@ -24,8 +28,8 @@ private[akka] trait TimedOps {
def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] Source[O, Mat2], onComplete: FiniteDuration Unit): Source[O, Mat2] = {
val ctx = new TimedFlowContext
val startTimed = Flow[I].transform(() new StartTimed(ctx)).named("startTimed")
val stopTimed = Flow[O].transform(() new StopTimed(ctx, onComplete)).named("stopTimed")
val startTimed = Flow[I].via(new StartTimed(ctx)).named("startTimed")
val stopTimed = Flow[O].via(new StopTimed(ctx, onComplete)).named("stopTimed")
measuredOps(source.via(startTimed)).via(stopTimed)
}
@ -40,8 +44,8 @@ private[akka] trait TimedOps {
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
val ctx = new TimedFlowContext
val startTimed = Flow[O].transform(() new StartTimed(ctx)).named("startTimed")
val stopTimed = Flow[Out].transform(() new StopTimed(ctx, onComplete)).named("stopTimed")
val startTimed = Flow[O].via(new StartTimed(ctx)).named("startTimed")
val stopTimed = Flow[Out].via(new StopTimed(ctx, onComplete)).named("stopTimed")
measuredOps(flow.via(startTimed)).via(stopTimed)
}
@ -61,7 +65,7 @@ private[akka] trait TimedIntervalBetweenOps {
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/
def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Source[O, Mat] = {
val timedInterval = Flow[O].transform(() new TimedInterval[O](matching, onInterval)).named("timedInterval")
val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval")
source.via(timedInterval)
}
@ -69,7 +73,7 @@ private[akka] trait TimedIntervalBetweenOps {
* Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/
def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[I, O, Mat] = {
val timedInterval = Flow[O].transform(() new TimedInterval[O](matching, onInterval)).named("timedInterval")
val timedInterval = Flow[O].via(new TimedInterval[O](matching, onInterval)).named("timedInterval")
flow.via(timedInterval)
}
}
@ -100,58 +104,84 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
}
}
final class StartTimed[T](timedContext: TimedFlowContext) extends PushStage[T, T] {
private var started = false
final class StartTimed[T](timedContext: TimedFlowContext) extends SimpleLinearGraphStage[T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (!started) {
timedContext.start()
started = true
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var started = false
override def onPush(): Unit = {
if (!started) {
timedContext.start()
started = true
}
push(out, grab(in))
}
ctx.push(elem)
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration Unit) extends PushStage[T, T] {
final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration Unit) extends SimpleLinearGraphStage[T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
stopTime()
ctx.fail(cause)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
stopTime()
ctx.finish()
}
private def stopTime() {
val d = timedContext.stop()
_onComplete(d)
}
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
stopTime()
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
stopTime()
completeStage()
}
private def stopTime() {
val d = timedContext.stop()
_onComplete(d)
}
setHandlers(in, out, this)
}
}
final class TimedInterval[T](matching: T Boolean, onInterval: FiniteDuration Unit) extends PushStage[T, T] {
private var prevNanos = 0L
private var matched = 0L
final class TimedInterval[T](matching: T Boolean, onInterval: FiniteDuration Unit) extends SimpleLinearGraphStage[T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (matching(elem)) {
val d = updateInterval(elem)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
if (matched > 1)
onInterval(d)
private var prevNanos = 0L
private var matched = 0L
override def onPush(): Unit = {
val elem = grab(in)
if (matching(elem)) {
val d = updateInterval(elem)
if (matched > 1)
onInterval(d)
}
push(out, elem)
}
ctx.push(elem)
override def onPull(): Unit = pull(in)
private def updateInterval(in: T): FiniteDuration = {
matched += 1
val nowNanos = System.nanoTime()
val d = nowNanos - prevNanos
prevNanos = nowNanos
d.nanoseconds
}
setHandlers(in, out, this)
}
private def updateInterval(in: T): FiniteDuration = {
matched += 1
val nowNanos = System.nanoTime()
val d = nowNanos - prevNanos
prevNanos = nowNanos
d.nanoseconds
}
}
}

View file

@ -787,7 +787,17 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.serialization.MessageFormats#PersistentStateChangeEventOrBuilder.hasTimeoutNanos"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.saveStateSnapshot"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout_=")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.fsm.PersistentFSM.akka$persistence$fsm$PersistentFSM$$currentStateTimeout_="),
// #19834
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$StartTimed"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StartTimed.onPush"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$TimedInterval"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#TimedInterval.onPush"),
ProblemFilters.exclude[MissingTypesProblem]("akka.stream.extra.Timed$StopTimed"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onPush"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFinish"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.extra.Timed#StopTimed.onUpstreamFailure")
)
)
}