diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala index 546bdf4f9a..b62c8c3ab1 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala @@ -23,6 +23,7 @@ import org.apache.pekko import pekko.annotation.InternalApi import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.Attributes.SourceLocation import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } @@ -32,7 +33,7 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } @InternalApi private[pekko] final class Unfold[S, E](s: S, f: S => Option[(S, E)]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("Unfold.out") override val shape: SourceShape[E] = SourceShape(out) - override def initialAttributes: Attributes = DefaultAttributes.unfold + override def initialAttributes: Attributes = DefaultAttributes.unfold and SourceLocation.forLambda(f) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { private[this] var state = s @@ -47,6 +48,37 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } setHandler(out, this) } + + override def toString: String = "Unfold" +} + +/** + * INTERNAL API + */ +@InternalApi +private[pekko] final class UnfoldJava[S, E](s: S, f: function.Function[S, Optional[Pair[S, E]]]) + extends GraphStage[SourceShape[E]] { + private val out: Outlet[E] = Outlet("Unfold.out") + override val shape: SourceShape[E] = SourceShape(out) + override def initialAttributes: Attributes = DefaultAttributes.unfold and SourceLocation.forLambda(f) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + private var state = s + + def onPull(): Unit = { + val maybeValue = f(state) + if (maybeValue.isPresent) { + val pair = maybeValue.get() + push(out, pair.second) + state = pair.first + } else { + complete(out) + } + } + + setHandler(out, this) + } + override def toString: String = "Unfold" } /** @@ -85,6 +117,8 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } setHandler(out, this) } + + override def toString: String = "UnfoldAsync" } /** @@ -140,4 +174,6 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } } setHandler(out, this) } + + override def toString: String = "UnfoldAsync" } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 99a54bbf0a..e15afb7525 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -34,7 +34,7 @@ import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, JavaPartialFunction, Pair, Util } import pekko.japi.function.Creator import pekko.stream._ -import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava } +import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -267,7 +267,7 @@ object Source { * a pair of the next state `S` and output elements of type `E`. */ def unfold[S, E](s: S, f: function.Function[S, Optional[Pair[S, E]]]): Source[E, NotUsed] = - new Source(scaladsl.Source.unfold(s)((s: S) => f.apply(s).toScala.map(_.toScala))) + new Source(scaladsl.Source.fromGraph(new UnfoldJava(s, f))) /** * Same as [[unfold]], but uses an async function to generate the next state-element tuple.