diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 8223673b88..746abb3da8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -198,16 +198,16 @@ object Source { * Create a `Source` that will unfold a value of type `S` into * a pair of the next state `S` and output elements of type `E`. */ - def unfold[S, E](s: S, f: function.Function[S, Optional[(S, E)]]): Source[E, NotUsed] = - new Source(scaladsl.Source.unfold(s)((s: S) ⇒ f.apply(s).asScala)) + def unfold[S, E](s: S, f: function.Function[S, Optional[Pair[S, E]]]): Source[E, NotUsed] = + new Source(scaladsl.Source.unfold(s)((s: S) ⇒ f.apply(s).asScala.map(_.toScala))) /** * Same as [[unfold]], but uses an async function to generate the next state-element tuple. */ - def unfoldAsync[S, E](s: S, f: function.Function[S, CompletionStage[Optional[(S, E)]]]): Source[E, NotUsed] = + def unfoldAsync[S, E](s: S, f: function.Function[S, CompletionStage[Optional[Pair[S, E]]]]): Source[E, NotUsed] = new Source( scaladsl.Source.unfoldAsync(s)( - (s: S) ⇒ f.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext))) + (s: S) ⇒ f.apply(s).toScala.map(_.asScala.map(_.toScala))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext))) /** * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.