diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala index 2fe1333546..dacb2441cc 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala @@ -238,7 +238,7 @@ import pekko.stream.stage._ override val shape: FlowShape[I, O] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler { + new TimerGraphStageLogic(shape) with StageLogging with InHandler with OutHandler { private var nextDeadline: Long = System.nanoTime + timeout.toNanos private val contextPropagation = ContextPropagation() @@ -268,33 +268,29 @@ import pekko.stream.stage._ push(out, grab(in)) if (isClosed(in)) completeStage() else pull(in) + } else emitInjectedElementOrReschedule(onTimer = false) + } + + private def emitInjectedElementOrReschedule(onTimer: Boolean): Unit = { + val now = System.nanoTime() + val diff = now - nextDeadline + if (diff < 0) { + if (onTimer) { + // Clock may be non-monotonic, see https://stackoverflow.com/questions/51344787/in-what-cases-clock-monotonic-might-not-be-available + log.info( + s"Timer should have triggered only after deadline but now is $now and deadline was $nextDeadline diff $diff. (time running backwards?) Reschedule instead of emitting.") + } + scheduleOnce(GraphStageLogicTimer, FiniteDuration(-diff, TimeUnit.NANOSECONDS)) } else { - val now = System.nanoTime() - // Idle timeout triggered a while ago and we were just waiting for pull. - // In the case of now == deadline, the deadline has not passed strictly, but scheduling another thunk - // for that seems wasteful. - if (now - nextDeadline >= 0) { - nextDeadline = now + timeout.toNanos - push(out, inject()) - } else - scheduleOnce(GraphStageLogicTimer, FiniteDuration(nextDeadline - now, TimeUnit.NANOSECONDS)) + push(out, inject()) + nextDeadline = now + timeout.toNanos } } - override protected def onTimer(timerKey: Any): Unit = { - val now = System.nanoTime() - // Timer is reliably cancelled if a regular element arrives first. Scheduler rather schedules too late - // than too early so the deadline must have passed at this time. - assert( - now - nextDeadline >= 0, - s"Timer should have triggered only after deadline but now is $now and deadline was $nextDeadline diff ${now - nextDeadline}.") - push(out, inject()) - nextDeadline = now + timeout.toNanos - } + override protected def onTimer(timerKey: Any): Unit = emitInjectedElementOrReschedule(onTimer = true) } - override def toString = "IdleTimer" - + override def toString = "IdleInject" } case object GraphStageLogicTimer