diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index a42e0da27b..d4e0efe69f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1899,6 +1899,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' after predicate returned true or downstream cancels * * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.takeWhile]] + * @since 1.2.0 */ def takeUntil(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeUntil(p.test)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index f55988a1d3..71b73a8005 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3636,6 +3636,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' after predicate returned true or downstream cancels * * See also [[Source.limit]], [[Source.limitWeighted]], [[Source.takeWhile]] + * @since 1.2.0 */ def takeUntil(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.takeUntil(p.test)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index c7dc65536b..7dd8eb323f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1248,6 +1248,7 @@ class SubFlow[In, Out, Mat]( * '''Cancels when''' after predicate returned true or downstream cancels * * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]], [[FlowOps.takeWhile]] + * @since 1.2.0 */ def takeUntil(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.takeUntil(p.test)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 19a0b86ddb..e38be6d3f2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1137,6 +1137,7 @@ class SubSource[Out, Mat]( * '''Cancels when''' after predicate returned true or downstream cancels * * See also [[SubSource.limit]], [[SubSource.limitWeighted]], [[SubSource.takeWhile]] + * @since 1.2.0 */ def takeUntil(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.takeUntil(p.test)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index bfd6010763..abd19e4561 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1590,6 +1590,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' after predicate returned true or downstream cancels * * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]], [[FlowOps.takeWhile]] + * @since 1.2.0 */ def takeUntil(p: Out => Boolean): Repr[Out] = takeWhile(!p(_), inclusive = true) diff --git a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala index b84cef8eff..eb1fb6e001 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala @@ -987,6 +987,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * This action replaces the [[OutHandler]] for the given outlet if suspension * is needed and reinstalls the current handler upon receiving an `onPull()` * signal (before invoking the `andThen` function). + * + * @since 1.2.0 */ final protected def emitMultiple[T](out: Outlet[T], elems: Spliterator[T], andThen: () => Unit): Unit = { val iter = new EmittingSpliterator[T](out, elems, getNonEmittingHandler(out), andThen)