diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala index 0f69f868aa..0ed829f002 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala @@ -36,9 +36,19 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { probe.expectNext(1) probe.expectComplete() - } } + "A FilterNot" must { + "filter based on inverted predicate" in { + def script = Script(TestConfig.RandomTestRange map + { _ ⇒ + val x = random.nextInt() + Seq(x) -> (if ((x & 1) == 1) Seq(x) else Seq()) + }: _*) + TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.filterNot(_ % 2 == 0))) + } + } + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 51eb310596..50dbf02e50 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -310,6 +310,20 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def filter(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.filter(p.test)) + /** + * Only pass on those elements that NOT satisfy the given predicate. + * + * '''Emits when''' the given predicate returns false for the element + * + * '''Backpressures when''' the given predicate returns false for the element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def filterNot(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.filterNot(p.test)) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index e7e036e4fc..ff6889c0cd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -506,6 +506,12 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def filter(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.filter(p.test)) + /** + * Only pass on those elements that NOT satisfy the given predicate. + */ + def filterNot(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = + new Source(delegate.filterNot(p.test)) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 04eef540ac..7ba5ee4736 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -491,6 +491,20 @@ trait FlowOps[+Out, +Mat] { */ def filter(p: Out ⇒ Boolean): Repr[Out, Mat] = andThen(Filter(p.asInstanceOf[Any ⇒ Boolean])) + /** + * Only pass on those elements that NOT satisfy the given predicate. + * + * '''Emits when''' the given predicate returns false for the element + * + * '''Backpressures when''' the given predicate returns false for the element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def filterNot(p: Out ⇒ Boolean): Repr[Out, Mat] = + via(Flow[Out].filter(!p(_)).withAttributes(name("filterNot"))) + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time. Due to input buffering some elements may have been