Added a flag on takeWhile to allow it to include the final element, #21330
This commit is contained in:
parent
7eb660b497
commit
a28d2c579f
9 changed files with 120 additions and 30 deletions
|
|
@ -3,10 +3,9 @@
|
|||
*/
|
||||
package akka.pattern
|
||||
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
import scala.concurrent.duration.{Deadline, Duration, FiniteDuration}
|
||||
import scala.concurrent.duration.{ Deadline, Duration, FiniteDuration }
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.Optional
|
||||
|
||||
|
|
@ -83,8 +82,8 @@ object BackoffSupervisor {
|
|||
|
||||
/**
|
||||
* Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]].
|
||||
*
|
||||
* @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor.
|
||||
*
|
||||
* @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor.
|
||||
*/
|
||||
def props(options: BackoffOptions): Props = options.props
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.stream.scaladsl
|
|||
import akka.NotUsed
|
||||
import akka.stream.ActorAttributes.supervisionStrategy
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.Supervision.{restartingDecider, resumingDecider}
|
||||
import akka.stream.Supervision.{ restartingDecider, resumingDecider }
|
||||
import akka.stream.impl.ReactiveStreamsCompliance
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit._
|
||||
|
|
@ -14,7 +14,7 @@ import akka.testkit.TestLatch
|
|||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{Await, Future}
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
class FlowFoldAsyncSpec extends StreamSpec {
|
||||
|
|
|
|||
|
|
@ -43,6 +43,13 @@ class FlowTakeWhileSpec extends StreamSpec {
|
|||
.expectComplete()
|
||||
}
|
||||
|
||||
"emit the element that caused the predicate to return false and then no more with inclusive set" in assertAllStagesStopped {
|
||||
Source(1 to 10).takeWhile(_ < 3, true).runWith(TestSink.probe[Int])
|
||||
.request(4)
|
||||
.expectNext(1, 2, 3)
|
||||
.expectComplete()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
|
||||
final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] {
|
||||
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
|
||||
|
||||
override def toString: String = "TakeWhile"
|
||||
|
|
@ -109,6 +109,7 @@ final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T
|
|||
if (p(elem)) {
|
||||
push(out, elem)
|
||||
} else {
|
||||
if (inclusive) push(out, elem)
|
||||
completeStage()
|
||||
}
|
||||
} catch {
|
||||
|
|
|
|||
|
|
@ -777,9 +777,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time. Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
* returns false for the first time, including the first failed element iff inclusive is true
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
|
|
@ -788,13 +788,34 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false or upstream completes
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test))
|
||||
def takeWhile(p: function.Predicate[Out], inclusive: Boolean = false): javadsl.Flow[In, Out, Mat] = new Flow(delegate.takeWhile(p.test, inclusive))
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time, including the first failed element iff inclusive is true
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
*
|
||||
* '''Emits when''' the predicate is true
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = takeWhile(p, false)
|
||||
|
||||
/**
|
||||
* Discard elements at the beginning of the stream while predicate is true.
|
||||
|
|
|
|||
|
|
@ -608,9 +608,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time. Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
* returns false for the first time,
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
|
|
@ -619,12 +619,31 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false or upstream completes
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out]): SubFlow[In, Out, Mat] =
|
||||
new SubFlow(delegate.takeWhile(p.test))
|
||||
def takeWhile(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = takeWhile(p, false)
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time, including the first failed element iff inclusive is true
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
*
|
||||
* '''Emits when''' the predicate is true
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out], inclusive: Boolean): SubFlow[In, Out, Mat] =
|
||||
new SubFlow(delegate.takeWhile(p.test, inclusive))
|
||||
|
||||
/**
|
||||
* Discard elements at the beginning of the stream while predicate is true.
|
||||
|
|
|
|||
|
|
@ -578,9 +578,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time. Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
* returns false for the first time,
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
|
|
@ -589,12 +589,31 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false or upstream completes
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out]): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.takeWhile(p.test))
|
||||
def takeWhile(p: function.Predicate[Out]): SubSource[Out, Mat] = takeWhile(p, false)
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time, including the first failed element iff inclusive is true
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
*
|
||||
* '''Emits when''' the predicate is true
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*/
|
||||
def takeWhile(p: function.Predicate[Out], inclusive: Boolean): SubSource[Out, Mat] =
|
||||
new SubSource(delegate.takeWhile(p.test, inclusive))
|
||||
|
||||
/**
|
||||
* Discard elements at the beginning of the stream while predicate is true.
|
||||
|
|
|
|||
|
|
@ -616,9 +616,9 @@ trait FlowOps[+Out, +Mat] {
|
|||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time. Due to input buffering some elements may have been
|
||||
* requested from upstream publishers that will then not be processed downstream
|
||||
* of this step.
|
||||
* returns false for the first time,
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
|
|
@ -627,13 +627,34 @@ trait FlowOps[+Out, +Mat] {
|
|||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false or upstream completes
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*
|
||||
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: Out ⇒ Boolean): Repr[Out] = via(TakeWhile(p))
|
||||
def takeWhile(p: Out ⇒ Boolean): Repr[Out] = takeWhile(p, false)
|
||||
|
||||
/**
|
||||
* Terminate processing (and cancel the upstream publisher) after predicate
|
||||
* returns false for the first time, including the first failed element iff inclusive is true
|
||||
* Due to input buffering some elements may have been requested from upstream publishers
|
||||
* that will then not be processed downstream of this step.
|
||||
*
|
||||
* The stream will be completed without producing any elements if predicate is false for
|
||||
* the first stream element.
|
||||
*
|
||||
* '''Emits when''' the predicate is true
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
|
||||
*
|
||||
* '''Cancels when''' predicate returned false or downstream cancels
|
||||
*
|
||||
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
|
||||
*/
|
||||
def takeWhile(p: Out ⇒ Boolean, inclusive: Boolean): Repr[Out] = via(TakeWhile(p, inclusive))
|
||||
|
||||
/**
|
||||
* Discard elements at the beginning of the stream while predicate is true.
|
||||
|
|
|
|||
|
|
@ -994,7 +994,10 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.hasUid2"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.getUid2"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteWatcher.receiveHeartbeatRsp"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg"),
|
||||
|
||||
// #21330 takeWhile inclusive flag
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.takeWhile")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue