chore: Clean up some duplicated code.
--------- Co-authored-by: Matthew de Detrich <matthew.dedetrich@aiven.io>
This commit is contained in:
parent
b1ec854b8c
commit
637d72af7a
1 changed files with 8 additions and 6 deletions
|
|
@ -193,9 +193,8 @@ import pekko.util.ccompat._
|
|||
} catch {
|
||||
case NonFatal(ex) =>
|
||||
decider(ex) match {
|
||||
case Supervision.Stop => failStage(ex)
|
||||
case Supervision.Resume => if (!hasBeenPulled(in)) pull(in)
|
||||
case Supervision.Restart => if (!hasBeenPulled(in)) pull(in)
|
||||
case Supervision.Stop => failStage(ex)
|
||||
case _ => pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -267,9 +266,12 @@ private[stream] object Collect {
|
|||
} catch {
|
||||
case NonFatal(ex) =>
|
||||
decider(ex) match {
|
||||
case Supervision.Stop => failStage(ex)
|
||||
case Supervision.Resume => if (!hasBeenPulled(in)) pull(in)
|
||||
case Supervision.Restart => if (!hasBeenPulled(in)) pull(in)
|
||||
case Supervision.Stop => failStage(ex)
|
||||
case _ =>
|
||||
// The !hasBeenPulled(in) check is not required here since it
|
||||
// isn't possible to do an additional pull(in) due to the nature
|
||||
// of how collect works
|
||||
pull(in)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue