chore: Reduce convertions in javadsl's unfold operator. (#1236)

This commit is contained in:
He-Pin(kerr) 2024-03-28 15:55:51 +08:00 committed by GitHub
parent ebf5c89a83
commit ecdea09145
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 39 additions and 3 deletions

View file

@ -23,6 +23,7 @@ import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.japi.{ function, Pair }
import pekko.stream._
import pekko.stream.Attributes.SourceLocation
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
@ -32,7 +33,7 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
@InternalApi private[pekko] final class Unfold[S, E](s: S, f: S => Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold.out")
override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfold
override def initialAttributes: Attributes = DefaultAttributes.unfold and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private[this] var state = s
@ -47,6 +48,37 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
setHandler(out, this)
}
override def toString: String = "Unfold"
}
/**
* INTERNAL API
*/
@InternalApi
private[pekko] final class UnfoldJava[S, E](s: S, f: function.Function[S, Optional[Pair[S, E]]])
extends GraphStage[SourceShape[E]] {
private val out: Outlet[E] = Outlet("Unfold.out")
override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfold and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private var state = s
def onPull(): Unit = {
val maybeValue = f(state)
if (maybeValue.isPresent) {
val pair = maybeValue.get()
push(out, pair.second)
state = pair.first
} else {
complete(out)
}
}
setHandler(out, this)
}
override def toString: String = "Unfold"
}
/**
@ -85,6 +117,8 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
setHandler(out, this)
}
override def toString: String = "UnfoldAsync"
}
/**
@ -140,4 +174,6 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
}
setHandler(out, this)
}
override def toString: String = "UnfoldAsync"
}

View file

@ -34,7 +34,7 @@ import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, JavaPartialFunction, Pair, Util }
import pekko.japi.function.Creator
import pekko.stream._
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava }
import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava }
import pekko.util.{ unused, _ }
import pekko.util.FutureConverters._
import pekko.util.JavaDurationConverters._
@ -267,7 +267,7 @@ object Source {
* a pair of the next state `S` and output elements of type `E`.
*/
def unfold[S, E](s: S, f: function.Function[S, Optional[Pair[S, E]]]): Source[E, NotUsed] =
new Source(scaladsl.Source.unfold(s)((s: S) => f.apply(s).toScala.map(_.toScala)))
new Source(scaladsl.Source.fromGraph(new UnfoldJava(s, f)))
/**
* Same as [[unfold]], but uses an async function to generate the next state-element tuple.