From e6662e12dec7f66c75435721da26e595bd4cc08f Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Thu, 29 May 2025 06:43:17 +0800 Subject: [PATCH] feat: Add dropRepeated stream operator. (#1868) --- .../org/apache/pekko/util/ConstantFun.scala | 8 ++ .../operators/Source-or-Flow/dropRepeated.md | 41 ++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../jdocs/stream/operators/SourceOrFlow.java | 19 ++++- .../operators/sourceorflow/Filter.scala | 14 ++++ .../pekko/stream/javadsl/SourceTest.java | 10 ++- .../scaladsl/FlowDropRepeatedSpec.scala | 79 +++++++++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../stream/impl/fusing/DropRepeated.scala | 77 ++++++++++++++++++ .../apache/pekko/stream/javadsl/Flow.scala | 36 +++++++++ .../apache/pekko/stream/javadsl/Source.scala | 36 +++++++++ .../apache/pekko/stream/javadsl/SubFlow.scala | 36 +++++++++ .../pekko/stream/javadsl/SubSource.scala | 35 ++++++++ .../apache/pekko/stream/scaladsl/Flow.scala | 34 ++++++++ 14 files changed, 423 insertions(+), 5 deletions(-) create mode 100644 docs/src/main/paradox/stream/operators/Source-or-Flow/dropRepeated.md create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDropRepeatedSpec.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DropRepeated.scala diff --git a/actor/src/main/scala/org/apache/pekko/util/ConstantFun.scala b/actor/src/main/scala/org/apache/pekko/util/ConstantFun.scala index c59ead9082..cf5007f090 100644 --- a/actor/src/main/scala/org/apache/pekko/util/ConstantFun.scala +++ b/actor/src/main/scala/org/apache/pekko/util/ConstantFun.scala @@ -18,6 +18,8 @@ import pekko.annotation.InternalApi import pekko.japi.{ Pair => JPair } import pekko.japi.function.{ Function => JFun, Function2 => JFun2 } +import java.lang + /** * INTERNAL API */ @@ -49,6 +51,12 @@ import pekko.japi.function.{ Function => JFun, Function2 => JFun2 } def javaAnyToNone[A, B]: A => Option[B] = none def nullFun[T] = _nullFun.asInstanceOf[Any => T] + def scalaAnyTwoEquals[T]: (T, T) => Boolean = _ == _ + + def javaAnyTwoEquals[T]: JFun2[T, T, java.lang.Boolean] = new JFun2[T, T, java.lang.Boolean] { + override def apply(a: T, b: T): lang.Boolean = a == b + } + val zeroLong = (_: Any) => 0L val oneLong = (_: Any) => 1L diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/dropRepeated.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/dropRepeated.md new file mode 100644 index 0000000000..4bce0ea089 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/dropRepeated.md @@ -0,0 +1,41 @@ +# dropRepeated + +Only pass on those elements that are distinct from the previous element. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Source.dropRepeated](Source) { scala="#dropRepeated):FlowOps.this.Repr[Out]" java="#dropRepeated()" } +@apidoc[Flow.dropRepeated](Flow) { scala="#dropRepeated():FlowOps.this.Repr[Out]" java="#dropRepeated()" } + + +## Description + +Only pass on those elements that are distinct from the previous element. + +## Example + +For example, given a `Source` of numbers, we just want to pass distinct numbers downstream: + +Scala +: @@snip [Filter.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/Filter.scala) { #dropRepeated } + +Java +: @@snip [SourceOrFlow.java](/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java) { #dropRepeated } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the element is distinct from the previous element + +**backpressures** when the element is distinct from the previous element and downstream backpressures + +**completes** when upstream completes + +@@@ + +## API docs + +@apidoc[Flow.filter](Flow) { scala="#dropRepeated():FlowOps.this.Repr[Out]" java="#dropRepeated()" } diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index b31a22bf86..37e2d6fdcc 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -158,6 +158,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[detach](Source-or-Flow/detach.md)|Detach upstream demand from downstream demand without detaching the stream rates.| |Flow|@ref[dimap](Flow/dimap.md)|Transform this Flow by applying a function `f` to each *incoming* upstream element before it is passed to the Flow, and a function `g` to each *outgoing* downstream element.| |Source/Flow|@ref[drop](Source-or-Flow/drop.md)|Drop `n` elements and then pass any subsequent element downstream.| +|Source/Flow|@ref[dropRepeated](Source-or-Flow/dropRepeated.md)|Only pass on those elements that are distinct from the previous element.| |Source/Flow|@ref[dropWhile](Source-or-Flow/dropWhile.md)|Drop elements as long as a predicate function return true for the element| |Source/Flow|@ref[filter](Source-or-Flow/filter.md)|Filter the incoming elements using a predicate.| |Source/Flow|@ref[filterNot](Source-or-Flow/filterNot.md)|Filter the incoming elements using a predicate.| @@ -449,6 +450,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [dimap](Flow/dimap.md) * [divertTo](Source-or-Flow/divertTo.md) * [drop](Source-or-Flow/drop.md) +* [dropRepeated](Source-or-Flow/dropRepeated.md) * [dropWhile](Source-or-Flow/dropWhile.md) * [dropWithin](Source-or-Flow/dropWithin.md) * [empty](Source/empty.md) diff --git a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java index 3bab8fb3de..4a9d058d42 100644 --- a/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java +++ b/docs/src/test/java/jdocs/stream/operators/SourceOrFlow.java @@ -58,16 +58,12 @@ import java.util.*; // #zip // #log -import org.apache.pekko.event.LogMarker; -import org.apache.pekko.stream.Attributes; // #log import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.function.Function; -import java.util.stream.Collectors; class SourceOrFlow { private static ActorSystem system = null; @@ -608,6 +604,21 @@ class SourceOrFlow { // #filterNot } + void dropRepeatedExample() { + // #dropRepeated + Source.from(Arrays.asList(1, 2, 2, 3, 3, 1, 4)) + .dropRepeated() + .runForeach(System.out::println, system); + // prints: + // 1 + // 2 + // 3 + // 1 + // 4 + + // #dropRepeated + } + void dropExample() { // #drop Source fiveIntegers = Source.from(Arrays.asList(1, 2, 3, 4, 5)); diff --git a/docs/src/test/scala/docs/stream/operators/sourceorflow/Filter.scala b/docs/src/test/scala/docs/stream/operators/sourceorflow/Filter.scala index c368633a1a..a91c31a65e 100644 --- a/docs/src/test/scala/docs/stream/operators/sourceorflow/Filter.scala +++ b/docs/src/test/scala/docs/stream/operators/sourceorflow/Filter.scala @@ -54,4 +54,18 @@ object Filter { // incididunt // #filterNot } + + def dropRepeated(): Unit = { + // #dropRepeated + Source(List(1, 2, 2, 3, 3, 1, 4)) + .dropRepeated() + .runForeach(println) + // prints: + // 1 + // 2 + // 3 + // 1 + // 4 + // #dropRepeated + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 86462f7b80..2fd92863a2 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -817,7 +817,15 @@ public class SourceTest extends StreamTest { } @Test - public void mustBeAbleToUseStatefulMapAsDistinctUntilChanged() throws Exception { + public void mustBeAbleToUseDropRepeated() throws Exception { + final java.lang.Iterable input = Arrays.asList(1, 1, 1, 2, 3, 3, 3, 4, 1, 5, 5, 5); + final CompletionStage result = + Source.from(input).dropRepeated().runFold("", (acc, elem) -> acc + elem, system); + Assert.assertEquals("123415", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustBeAbleToUseStatefulMapAsDropRepeated() throws Exception { final java.lang.Iterable input = Arrays.asList(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 5); final CompletionStage result = Source.from(input) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDropRepeatedSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDropRepeatedSpec.scala new file mode 100644 index 0000000000..40f50329ab --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowDropRepeatedSpec.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2014-2022 Lightbend Inc. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.ActorAttributes._ +import pekko.stream.Supervision._ +import pekko.stream.testkit._ +import pekko.stream.testkit.scaladsl.TestSink + +import scala.util.control.NoStackTrace + +class FlowDropRepeatedSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) with ScriptedTest { + private val TE = new Exception("TEST") with NoStackTrace { + override def toString = "TE" + } + + "A DropRepeated" must { + + "remove duplicated elements" in { + Source(List(1, 2, 2, 3, 3, 1, 4)) + .dropRepeated() + .runWith(TestSink.probe[Int]) + .request(7) + .expectNext(1, 2, 3, 1, 4) + .expectComplete() + } + + "only distinct with the previous element" in { + Source(List(1, 2, 2, 3, 2, 3)) + .dropRepeated(_ == _) + .runWith(TestSink.probe[Int]) + .request(6) + .expectNext(1, 2, 3, 2, 3) + .expectComplete() + } + + "continue if error" in { + Source(List(1, 2, 2, 3, 3, 1, 4)) + .dropRepeated((a, b) => { + if (b == 2) throw TE + else a == b + }) + .withAttributes(supervisionStrategy(resumingDecider)) + .runWith(TestSink.probe[Int]) + .request(7) + .expectNext(1, 3, 1, 4) + .expectComplete() + } + + "restart if error" in { + Source(List(1, 2, 2, 3, 3, 1, 4)) + .dropRepeated((a, b) => { + if (b == 2) throw TE + else a == b + }) + .withAttributes(supervisionStrategy(restartingDecider)) + .runWith(TestSink.probe[Int]) + .request(6) + .expectNext(1, 2, 3, 1, 4) + .expectComplete() + } + + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 5425570d7c..1d2c872a6e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -170,6 +170,7 @@ import pekko.stream.Attributes._ val inputBoundary = name("input-boundary") val outputBoundary = name("output-boundary") + val dropRepeated = name("dropRepeated") } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DropRepeated.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DropRepeated.scala new file mode 100644 index 0000000000..18d54cd4ad --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/DropRepeated.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet, Supervision } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import pekko.util.OptionVal + +import scala.util.control.NonFatal + +private[pekko] final class DropRepeated[T](predicate: (T, T) => Boolean) extends GraphStage[FlowShape[T, T]] { + require(predicate != null, "predicate function should not be null") + + private val in = Inlet[T]("DropRepeated.in") + private val out = Outlet[T]("DropRepeated.out") + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def initialAttributes: Attributes = DefaultAttributes.dropRepeated + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + private var last: OptionVal[T] = OptionVal.none + final override def onPush(): Unit = { + val elem = grab(in) + last match { + case OptionVal.Some(lastElem) => + try { + if (predicate(lastElem, elem)) { + pullOrComplete() + } else { + push(out, elem) + } + last = OptionVal.Some(elem) + } catch { + case NonFatal(e) => + decider(e) match { + case Supervision.Stop => + failStage(e) + case Supervision.Resume => + pullOrComplete() + case Supervision.Restart => + last = OptionVal.none + pullOrComplete() + } + } + case OptionVal.None => + last = OptionVal.Some(elem) + push(out, last.get) + } + } + + private def pullOrComplete(): Unit = if (isClosed(in)) completeStage() else pull(in) + + final override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 4128edaf70..cba5959a4e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -1170,6 +1170,42 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def filterNot(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.filterNot(p.test)) + /** + * Only pass on those elements that are distinct from the previous element. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.dropRepeated()) + + /** + * Only pass on those elements that are distinct from the previous element according to the given predicate. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(p: function.Function2[Out, Out, Boolean]): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.dropRepeated(p.apply)) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index dbdd12d46e..15afdc250c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2919,6 +2919,42 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def filterNot(p: function.Predicate[Out]): javadsl.Source[Out, Mat] = new Source(delegate.filterNot(p.test)) + /** + * Only pass on those elements that are distinct from the previous element. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(): javadsl.Source[Out, Mat] = + new Source(delegate.dropRepeated()) + + /** + * Only pass on those elements that are distinct from the previous element according to the given predicate. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(p: function.Function2[Out, Out, Boolean]): javadsl.Source[Out, Mat] = + new Source(delegate.dropRepeated(p.apply)) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 66487580c0..04d251fb1d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -529,6 +529,42 @@ class SubFlow[In, Out, Mat]( def filterNot(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.filterNot(p.test)) + /** + * Only pass on those elements that are distinct from the previous element. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(): SubFlow[In, Out, Mat] = + new SubFlow(delegate.dropRepeated()) + + /** + * Only pass on those elements that are distinct from the previous element according to the given predicate. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(p: function.Function2[Out, Out, Boolean]): SubFlow[In, Out, Mat] = + new SubFlow(delegate.dropRepeated(p.apply)) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 78f4cc9c36..bea6c8c0db 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -520,6 +520,41 @@ class SubSource[Out, Mat]( def filterNot(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.filterNot(p.test)) + /** + * Only pass on those elements that are distinct from the previous element. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(): SubSource[Out, Mat] = new SubSource(delegate.dropRepeated()) + + /** + * Only pass on those elements that are distinct from the previous element according to the given predicate. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(p: function.Function2[Out, Out, Boolean]): SubSource[Out, Mat] = + new SubSource(delegate.dropRepeated(p.apply)) + /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 2749b391ed..56b4aa334f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1554,6 +1554,40 @@ trait FlowOps[+Out, +Mat] { def filterNot(p: Out => Boolean): Repr[Out] = via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot and SourceLocation.forLambda(p))) + /** + * Only pass on those elements that are distinct from the previous element. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(): Repr[Out] = dropRepeated(ConstantFun.scalaAnyTwoEquals) + + /** + * Only pass on those elements that are distinct from the previous element according to the given predicate. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the element is distinct from the previous element + * + * '''Backpressures when''' the element is distinct from the previous element and downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @since 1.2.0 + */ + def dropRepeated(p: (Out, Out) => Boolean): Repr[Out] = via(new DropRepeated(p)) + /** * Terminate processing (and cancel the upstream publisher) after predicate * returns false for the first time,