chore: Fuse In/OutHandler into GraphStageLogic. (#1208)
This commit is contained in:
parent
5a0b2c1cfa
commit
da5d8098e8
1 changed files with 120 additions and 124 deletions
|
|
@ -498,43 +498,39 @@ import pekko.util.ccompat.JavaConverters._
|
|||
|
||||
override def initialAttributes: Attributes = DefaultAttributes.split and SourceLocation.forLambda(p)
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
import Split._
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||
new TimerGraphStageLogic(shape) with InHandler with OutHandler { parent =>
|
||||
import Split._
|
||||
|
||||
private val SubscriptionTimer = "SubstreamSubscriptionTimer"
|
||||
private val SubscriptionTimer = "SubstreamSubscriptionTimer"
|
||||
|
||||
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||
|
||||
private val timeout: FiniteDuration =
|
||||
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
|
||||
private var substreamSource: SubSourceOutlet[T] = null
|
||||
private var substreamWaitingToBePushed = false
|
||||
private var substreamCancelled = false
|
||||
private val timeout: FiniteDuration =
|
||||
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
|
||||
private var substreamSource: SubSourceOutlet[T] = null
|
||||
private var substreamWaitingToBePushed = false
|
||||
private var substreamCancelled = false
|
||||
|
||||
def propagateSubstreamCancel(ex: Throwable): Boolean =
|
||||
decider(ex) match {
|
||||
case Supervision.Stop => true
|
||||
case Supervision.Resume => false
|
||||
case Supervision.Restart => false
|
||||
def propagateSubstreamCancel(ex: Throwable): Boolean =
|
||||
decider(ex) match {
|
||||
case Supervision.Stop => true
|
||||
case Supervision.Resume => false
|
||||
case Supervision.Restart => false
|
||||
}
|
||||
|
||||
override def onPull(): Unit = {
|
||||
if (substreamSource eq null) {
|
||||
// can be already pulled from substream in case split after
|
||||
if (!hasBeenPulled(in)) pull(in)
|
||||
} else if (substreamWaitingToBePushed) pushSubstreamSource()
|
||||
}
|
||||
|
||||
setHandler(
|
||||
out,
|
||||
new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (substreamSource eq null) {
|
||||
// can be already pulled from substream in case split after
|
||||
if (!hasBeenPulled(in)) pull(in)
|
||||
} else if (substreamWaitingToBePushed) pushSubstreamSource()
|
||||
}
|
||||
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||
// If the substream is already cancelled or it has not been handed out, we can go away
|
||||
if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause)
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||
// If the substream is already cancelled or it has not been handed out, we can go away
|
||||
if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) cancelStage(cause)
|
||||
}
|
||||
})
|
||||
|
||||
val initInHandler = new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val handler = new SubstreamHandler
|
||||
val elem = grab(in)
|
||||
|
|
@ -549,115 +545,115 @@ import pekko.util.ccompat.JavaConverters._
|
|||
|
||||
handOver(handler)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = completeStage()
|
||||
}
|
||||
|
||||
// initial input handler
|
||||
setHandler(in, initInHandler)
|
||||
// initial input handler
|
||||
setHandlers(in, out, this)
|
||||
|
||||
private def handOver(handler: SubstreamHandler): Unit = {
|
||||
if (isClosed(out)) completeStage()
|
||||
else {
|
||||
substreamSource = new SubSourceOutlet[T]("SplitSource")
|
||||
substreamSource.setHandler(handler)
|
||||
substreamCancelled = false
|
||||
setHandler(in, handler)
|
||||
setKeepGoing(enabled = handler.hasInitialElement)
|
||||
private def handOver(handler: SubstreamHandler): Unit = {
|
||||
if (isClosed(out)) completeStage()
|
||||
else {
|
||||
substreamSource = new SubSourceOutlet[T]("SplitSource")
|
||||
substreamSource.setHandler(handler)
|
||||
substreamCancelled = false
|
||||
setHandler(in, handler)
|
||||
setKeepGoing(enabled = handler.hasInitialElement)
|
||||
|
||||
if (isAvailable(out)) {
|
||||
if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in)
|
||||
} else substreamWaitingToBePushed = true
|
||||
}
|
||||
}
|
||||
|
||||
private def pushSubstreamSource(): Unit = {
|
||||
push(out, Source.fromGraph(substreamSource.source))
|
||||
scheduleOnce(SubscriptionTimer, timeout)
|
||||
substreamWaitingToBePushed = false
|
||||
}
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout)
|
||||
|
||||
private class SubstreamHandler extends InHandler with OutHandler {
|
||||
|
||||
var firstElem: T = null.asInstanceOf[T]
|
||||
|
||||
def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null
|
||||
private var willCompleteAfterInitialElement = false
|
||||
|
||||
// Substreams are always assumed to be pushable position when we enter this method
|
||||
private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = {
|
||||
decision match {
|
||||
case SplitAfter =>
|
||||
if (!substreamCancelled) {
|
||||
substreamSource.push(currentElem)
|
||||
substreamSource.complete()
|
||||
}
|
||||
case SplitBefore =>
|
||||
handler.firstElem = currentElem
|
||||
if (!substreamCancelled) substreamSource.complete()
|
||||
if (isAvailable(out)) {
|
||||
if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in)
|
||||
} else substreamWaitingToBePushed = true
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(): Unit = {
|
||||
cancelTimer(SubscriptionTimer)
|
||||
if (hasInitialElement) {
|
||||
substreamSource.push(firstElem)
|
||||
firstElem = null.asInstanceOf[T]
|
||||
setKeepGoing(false)
|
||||
if (willCompleteAfterInitialElement) {
|
||||
private def pushSubstreamSource(): Unit = {
|
||||
push(out, Source.fromGraph(substreamSource.source))
|
||||
scheduleOnce(SubscriptionTimer, timeout)
|
||||
substreamWaitingToBePushed = false
|
||||
}
|
||||
|
||||
override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout)
|
||||
|
||||
private class SubstreamHandler extends InHandler with OutHandler {
|
||||
|
||||
var firstElem: T = null.asInstanceOf[T]
|
||||
|
||||
def hasInitialElement: Boolean = firstElem.asInstanceOf[AnyRef] ne null
|
||||
private var willCompleteAfterInitialElement = false
|
||||
|
||||
// Substreams are always assumed to be pushable position when we enter this method
|
||||
private def closeThis(handler: SubstreamHandler, currentElem: T): Unit = {
|
||||
decision match {
|
||||
case SplitAfter =>
|
||||
if (!substreamCancelled) {
|
||||
substreamSource.push(currentElem)
|
||||
substreamSource.complete()
|
||||
}
|
||||
case SplitBefore =>
|
||||
handler.firstElem = currentElem
|
||||
if (!substreamCancelled) substreamSource.complete()
|
||||
}
|
||||
}
|
||||
|
||||
override def onPull(): Unit = {
|
||||
cancelTimer(SubscriptionTimer)
|
||||
if (hasInitialElement) {
|
||||
substreamSource.push(firstElem)
|
||||
firstElem = null.asInstanceOf[T]
|
||||
setKeepGoing(false)
|
||||
if (willCompleteAfterInitialElement) {
|
||||
substreamSource.complete()
|
||||
completeStage()
|
||||
}
|
||||
} else pull(in)
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||
substreamCancelled = true
|
||||
if (isClosed(in) || propagateSubstreamCancel(cause)) {
|
||||
cancelStage(cause)
|
||||
} else {
|
||||
// Start draining
|
||||
if (!hasBeenPulled(in)) pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
try {
|
||||
if (p(elem)) {
|
||||
val handler = new SubstreamHandler
|
||||
closeThis(handler, elem)
|
||||
if (decision == SplitBefore) handOver(handler)
|
||||
else {
|
||||
substreamSource = null
|
||||
setHandler(in, parent)
|
||||
pull(in)
|
||||
}
|
||||
} else {
|
||||
// Drain into the void
|
||||
if (substreamCancelled) pull(in)
|
||||
else substreamSource.push(elem)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) => onUpstreamFailure(ex)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit =
|
||||
if (hasInitialElement) willCompleteAfterInitialElement = true
|
||||
else {
|
||||
substreamSource.complete()
|
||||
completeStage()
|
||||
}
|
||||
} else pull(in)
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||
substreamCancelled = true
|
||||
if (isClosed(in) || propagateSubstreamCancel(cause)) {
|
||||
cancelStage(cause)
|
||||
} else {
|
||||
// Start draining
|
||||
if (!hasBeenPulled(in)) pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val elem = grab(in)
|
||||
try {
|
||||
if (p(elem)) {
|
||||
val handler = new SubstreamHandler
|
||||
closeThis(handler, elem)
|
||||
if (decision == SplitBefore) handOver(handler)
|
||||
else {
|
||||
substreamSource = null
|
||||
setHandler(in, initInHandler)
|
||||
pull(in)
|
||||
}
|
||||
} else {
|
||||
// Drain into the void
|
||||
if (substreamCancelled) pull(in)
|
||||
else substreamSource.push(elem)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) => onUpstreamFailure(ex)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit =
|
||||
if (hasInitialElement) willCompleteAfterInitialElement = true
|
||||
else {
|
||||
substreamSource.complete()
|
||||
completeStage()
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
substreamSource.fail(ex)
|
||||
failStage(ex)
|
||||
}
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
substreamSource.fail(ex)
|
||||
failStage(ex)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
override def toString: String = "Split"
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue