diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index a65a0b0830..6272320ceb 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -3,10 +3,9 @@ */ package akka.pattern - import java.time.Instant -import scala.concurrent.duration.{Deadline, Duration, FiniteDuration} +import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration } import java.util.concurrent.ThreadLocalRandom import java.util.Optional @@ -83,8 +82,8 @@ object BackoffSupervisor { /** * Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]]. - * - * @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor. + * + * @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor. */ def props(options: BackoffOptions): Props = options.props diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala index 545a6ec8b6..28828e599d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldAsyncSpec.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.ActorAttributes.supervisionStrategy import akka.stream.ActorMaterializer -import akka.stream.Supervision.{restartingDecider, resumingDecider} +import akka.stream.Supervision.{ restartingDecider, resumingDecider } import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.testkit.Utils._ import akka.stream.testkit._ @@ -14,7 +14,7 @@ import akka.testkit.TestLatch import org.scalatest.concurrent.PatienceConfiguration.Timeout import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} +import scala.concurrent.{ Await, Future } import scala.util.control.NoStackTrace class FlowFoldAsyncSpec extends StreamSpec { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala index 915b89455d..daec365ca4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWhileSpec.scala @@ -43,6 +43,13 @@ class FlowTakeWhileSpec extends StreamSpec { .expectComplete() } + "emit the element that caused the predicate to return false and then no more with inclusive set" in assertAllStagesStopped { + Source(1 to 10).takeWhile(_ < 3, true).runWith(TestSink.probe[Int]) + .request(4) + .expectNext(1, 2, 3) + .expectComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index e4cac2a4c4..381ebbc7cb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -92,7 +92,7 @@ final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { /** * INTERNAL API */ -final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { +final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.takeWhile override def toString: String = "TakeWhile" @@ -109,6 +109,7 @@ final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T if (p(elem)) { push(out, elem) } else { + if (inclusive) push(out, elem) completeStage() } } catch { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 967dde3e3d..eb0ca6f07e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -777,9 +777,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * Terminate processing (and cancel the upstream publisher) after predicate - * returns false for the first time. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. + * returns false for the first time, including the first failed element iff inclusive is true + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. * * The stream will be completed without producing any elements if predicate is false for * the first stream element. @@ -788,13 +788,34 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' predicate returned false or upstream completes + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels * * See also [[Flow.limit]], [[Flow.limitWeighted]] */ - def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test)) + def takeWhile(p: function.Predicate[Out], inclusive: Boolean = false): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test, inclusive)) + + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time, including the first failed element iff inclusive is true + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + * + * See also [[Flow.limit]], [[Flow.limitWeighted]] + */ + def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = takeWhile(p, false) /** * Discard elements at the beginning of the stream while predicate is true. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 4bd335e4a8..78efb1c14d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -608,9 +608,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo /** * Terminate processing (and cancel the upstream publisher) after predicate - * returns false for the first time. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. + * returns false for the first time, + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. * * The stream will be completed without producing any elements if predicate is false for * the first stream element. @@ -619,12 +619,31 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' predicate returned false or upstream completes + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels */ - def takeWhile(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = - new SubFlow(delegate.takeWhile(p.test)) + def takeWhile(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = takeWhile(p, false) + + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time, including the first failed element iff inclusive is true + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + */ + def takeWhile(p: function.Predicate[Out], inclusive: Boolean): SubFlow[In, Out, Mat] = + new SubFlow(delegate.takeWhile(p.test, inclusive)) /** * Discard elements at the beginning of the stream while predicate is true. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 93ee8a40fc..cd67bffe5d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -578,9 +578,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source /** * Terminate processing (and cancel the upstream publisher) after predicate - * returns false for the first time. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. + * returns false for the first time, + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. * * The stream will be completed without producing any elements if predicate is false for * the first stream element. @@ -589,12 +589,31 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' predicate returned false or upstream completes + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels */ - def takeWhile(p: function.Predicate[Out]): SubSource[Out, Mat] = - new SubSource(delegate.takeWhile(p.test)) + def takeWhile(p: function.Predicate[Out]): SubSource[Out, Mat] = takeWhile(p, false) + + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time, including the first failed element iff inclusive is true + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + */ + def takeWhile(p: function.Predicate[Out], inclusive: Boolean): SubSource[Out, Mat] = + new SubSource(delegate.takeWhile(p.test, inclusive)) /** * Discard elements at the beginning of the stream while predicate is true. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8bca92a0b2..6d16734c75 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -616,9 +616,9 @@ trait FlowOps[+Out, +Mat] { /** * Terminate processing (and cancel the upstream publisher) after predicate - * returns false for the first time. Due to input buffering some elements may have been - * requested from upstream publishers that will then not be processed downstream - * of this step. + * returns false for the first time, + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. * * The stream will be completed without producing any elements if predicate is false for * the first stream element. @@ -627,13 +627,34 @@ trait FlowOps[+Out, +Mat] { * * '''Backpressures when''' downstream backpressures * - * '''Completes when''' predicate returned false or upstream completes + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes * * '''Cancels when''' predicate returned false or downstream cancels * * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] */ - def takeWhile(p: Out ⇒ Boolean): Repr[Out] = via(TakeWhile(p)) + def takeWhile(p: Out ⇒ Boolean): Repr[Out] = takeWhile(p, false) + + /** + * Terminate processing (and cancel the upstream publisher) after predicate + * returns false for the first time, including the first failed element iff inclusive is true + * Due to input buffering some elements may have been requested from upstream publishers + * that will then not be processed downstream of this step. + * + * The stream will be completed without producing any elements if predicate is false for + * the first stream element. + * + * '''Emits when''' the predicate is true + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes + * + * '''Cancels when''' predicate returned false or downstream cancels + * + * See also [[FlowOps.limit]], [[FlowOps.limitWeighted]] + */ + def takeWhile(p: Out ⇒ Boolean, inclusive: Boolean): Repr[Out] = via(TakeWhile(p, inclusive)) /** * Discard elements at the beginning of the stream while predicate is true. diff --git a/project/MiMa.scala b/project/MiMa.scala index cf19166d07..347fbc507b 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -994,7 +994,10 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.hasUid2"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.getUid2"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteWatcher.receiveHeartbeatRsp"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg"), + + // #21330 takeWhile inclusive flag + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.takeWhile") ) ) }