=str #23953 cleanup ConcurrentAsyncCallback implementation
The existing implementation had lots of duplication. The Initializing state could be removed in favor of using `Pending(Nil)` instead which simplified the state handling logic further. Also, instead of using `OptionVal[Promise[Done]]` a special marker value, `NoPromise`, was introduced to mark the case when no promise was supplied. Fixes #23953.
This commit is contained in:
parent
6743d90e52
commit
a98887140c
3 changed files with 54 additions and 78 deletions
|
|
@ -194,7 +194,7 @@ import scala.util.control.NonFatal
|
|||
val log: LoggingAdapter,
|
||||
val logics: Array[GraphStageLogic], // Array of stage logics
|
||||
val connections: Array[GraphInterpreter.Connection],
|
||||
val onAsyncInput: (GraphStageLogic, Any, OptionVal[Promise[Done]], (Any) ⇒ Unit) ⇒ Unit,
|
||||
val onAsyncInput: (GraphStageLogic, Any, Promise[Done], (Any) ⇒ Unit) ⇒ Unit,
|
||||
val fuzzingMode: Boolean,
|
||||
val context: ActorRef) {
|
||||
|
||||
|
|
@ -435,7 +435,7 @@ import scala.util.control.NonFatal
|
|||
eventsRemaining
|
||||
}
|
||||
|
||||
def runAsyncInput(logic: GraphStageLogic, evt: Any, promise: OptionVal[Promise[Done]], handler: (Any) ⇒ Unit): Unit =
|
||||
def runAsyncInput(logic: GraphStageLogic, evt: Any, promise: Promise[Done], handler: (Any) ⇒ Unit): Unit =
|
||||
if (!isStageCompleted(logic)) {
|
||||
if (GraphInterpreter.Debug) println(s"$Name ASYNC $evt ($handler) [$logic]")
|
||||
val currentInterpreterHolder = _currentInterpreter.get()
|
||||
|
|
@ -445,16 +445,14 @@ import scala.util.control.NonFatal
|
|||
activeStage = logic
|
||||
try {
|
||||
handler(evt)
|
||||
if (promise.isDefined) {
|
||||
val p = promise.get
|
||||
p.success(Done)
|
||||
if (promise ne GraphStageLogic.NoPromise) {
|
||||
promise.success(Done)
|
||||
logic.onFeedbackDispatched()
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
if (promise.isDefined) {
|
||||
val p = promise.get
|
||||
promise.get.failure(ex)
|
||||
if (promise ne GraphStageLogic.NoPromise) {
|
||||
promise.failure(ex)
|
||||
logic.onFeedbackDispatched()
|
||||
}
|
||||
logic.failStage(ex)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue