chore: Use more lambda instread in stream module. (#1686)
This commit is contained in:
parent
477fd393c2
commit
7782cf55e8
6 changed files with 7 additions and 22 deletions
|
|
@ -74,11 +74,7 @@ import pekko.stream.stage._
|
|||
}
|
||||
})
|
||||
|
||||
subSink.setHandler(new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
push(out, subSink.grab())
|
||||
}
|
||||
})
|
||||
subSink.setHandler(() => push(out, subSink.grab()))
|
||||
|
||||
try {
|
||||
val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes)
|
||||
|
|
|
|||
|
|
@ -666,10 +666,7 @@ import org.reactivestreams.Subscription
|
|||
else {
|
||||
waitingForShutdown = true
|
||||
val subscriptionTimeout = attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
|
||||
mat.scheduleOnce(subscriptionTimeout,
|
||||
new Runnable {
|
||||
override def run(): Unit = self ! Abort(GraphInterpreterShell.this)
|
||||
})
|
||||
mat.scheduleOnce(subscriptionTimeout, () => self ! Abort(GraphInterpreterShell.this))
|
||||
}
|
||||
} else if (interpreter.isSuspended && !resumeScheduled) sendResume(!usingShellLimit)
|
||||
|
||||
|
|
|
|||
|
|
@ -199,9 +199,7 @@ import pekko.util.ccompat.JavaConverters._
|
|||
setKeepGoing(false)
|
||||
cancelTimer(SubscriptionTimer)
|
||||
pull(in)
|
||||
tailSource.setHandler(new OutHandler {
|
||||
override def onPull(): Unit = pull(in)
|
||||
})
|
||||
tailSource.setHandler(() => pull(in))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,9 +45,7 @@ object DelayStrategy {
|
|||
* Fixed delay strategy, always returns constant delay for any element.
|
||||
* @param delay value of the delay
|
||||
*/
|
||||
def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = new DelayStrategy[T] {
|
||||
override def nextDelay(elem: T): java.time.Duration = delay
|
||||
}
|
||||
def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = (_: T) => delay
|
||||
|
||||
/**
|
||||
* Strategy with linear increasing delay.
|
||||
|
|
|
|||
|
|
@ -477,8 +477,7 @@ object Zip {
|
|||
def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] =
|
||||
ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]])
|
||||
|
||||
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] =
|
||||
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
|
||||
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -506,8 +505,7 @@ object ZipLatest {
|
|||
def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] =
|
||||
ZipLatestWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]])
|
||||
|
||||
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] =
|
||||
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
|
||||
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -36,9 +36,7 @@ object DelayStrategy {
|
|||
* Fixed delay strategy, always returns constant delay for any element.
|
||||
* @param delay value of the delay
|
||||
*/
|
||||
def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = new DelayStrategy[Any] {
|
||||
override def nextDelay(elem: Any): FiniteDuration = delay
|
||||
}
|
||||
def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = (_: Any) => delay
|
||||
|
||||
/**
|
||||
* Strategy with linear increasing delay.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue