!str Logging error instead of failing the keepAlive operator.
This commit is contained in:
parent
5b97885623
commit
cb867edb4d
1 changed files with 18 additions and 22 deletions
|
|
@ -238,7 +238,7 @@ import pekko.stream.stage._
|
||||||
override val shape: FlowShape[I, O] = FlowShape(in, out)
|
override val shape: FlowShape[I, O] = FlowShape(in, out)
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
|
new TimerGraphStageLogic(shape) with StageLogging with InHandler with OutHandler {
|
||||||
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
|
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
|
||||||
private val contextPropagation = ContextPropagation()
|
private val contextPropagation = ContextPropagation()
|
||||||
|
|
||||||
|
|
@ -268,33 +268,29 @@ import pekko.stream.stage._
|
||||||
push(out, grab(in))
|
push(out, grab(in))
|
||||||
if (isClosed(in)) completeStage()
|
if (isClosed(in)) completeStage()
|
||||||
else pull(in)
|
else pull(in)
|
||||||
|
} else emitInjectedElementOrReschedule(onTimer = false)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def emitInjectedElementOrReschedule(onTimer: Boolean): Unit = {
|
||||||
|
val now = System.nanoTime()
|
||||||
|
val diff = now - nextDeadline
|
||||||
|
if (diff < 0) {
|
||||||
|
if (onTimer) {
|
||||||
|
// Clock may be non-monotonic, see https://stackoverflow.com/questions/51344787/in-what-cases-clock-monotonic-might-not-be-available
|
||||||
|
log.info(
|
||||||
|
s"Timer should have triggered only after deadline but now is $now and deadline was $nextDeadline diff $diff. (time running backwards?) Reschedule instead of emitting.")
|
||||||
|
}
|
||||||
|
scheduleOnce(GraphStageLogicTimer, FiniteDuration(-diff, TimeUnit.NANOSECONDS))
|
||||||
} else {
|
} else {
|
||||||
val now = System.nanoTime()
|
push(out, inject())
|
||||||
// Idle timeout triggered a while ago and we were just waiting for pull.
|
nextDeadline = now + timeout.toNanos
|
||||||
// In the case of now == deadline, the deadline has not passed strictly, but scheduling another thunk
|
|
||||||
// for that seems wasteful.
|
|
||||||
if (now - nextDeadline >= 0) {
|
|
||||||
nextDeadline = now + timeout.toNanos
|
|
||||||
push(out, inject())
|
|
||||||
} else
|
|
||||||
scheduleOnce(GraphStageLogicTimer, FiniteDuration(nextDeadline - now, TimeUnit.NANOSECONDS))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override protected def onTimer(timerKey: Any): Unit = {
|
override protected def onTimer(timerKey: Any): Unit = emitInjectedElementOrReschedule(onTimer = true)
|
||||||
val now = System.nanoTime()
|
|
||||||
// Timer is reliably cancelled if a regular element arrives first. Scheduler rather schedules too late
|
|
||||||
// than too early so the deadline must have passed at this time.
|
|
||||||
assert(
|
|
||||||
now - nextDeadline >= 0,
|
|
||||||
s"Timer should have triggered only after deadline but now is $now and deadline was $nextDeadline diff ${now - nextDeadline}.")
|
|
||||||
push(out, inject())
|
|
||||||
nextDeadline = now + timeout.toNanos
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString = "IdleTimer"
|
override def toString = "IdleInject"
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case object GraphStageLogicTimer
|
case object GraphStageLogicTimer
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue