From a1ade992ffd8da820383d72ce3dca15b8242f58a Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sun, 21 Sep 2025 16:15:13 +0800 Subject: [PATCH] feat: Add Sink#count operator. (#2244) * feat: Add Sink#count operator. * Use .asInstanceOf for better performance --------- Co-authored-by: Matthew de Detrich --- .../paradox/stream/operators/Sink/count.md | 39 +++++++++++ .../main/paradox/stream/operators/index.md | 2 + .../stream/operators/SinkDocExamples.java | 9 +++ .../apache/pekko/stream/javadsl/SinkTest.java | 7 ++ .../pekko/stream/scaladsl/SinkSpec.scala | 14 ++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../pekko/stream/impl/fusing/CountSink.scala | 66 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Sink.scala | 21 ++++++ .../apache/pekko/stream/scaladsl/Sink.scala | 22 ++++++- 9 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 docs/src/main/paradox/stream/operators/Sink/count.md create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala diff --git a/docs/src/main/paradox/stream/operators/Sink/count.md b/docs/src/main/paradox/stream/operators/Sink/count.md new file mode 100644 index 0000000000..44c02300bd --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/count.md @@ -0,0 +1,39 @@ +# Sink.count + +Counts all incoming elements until upstream terminates. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.count](Sink$) { scala="#count[T]:org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Long]]" java="#count()" } + + +## Description + +Counts values emitted from the stream, the count is available through a @scala[`Future`] @java[`CompletionStage`] or +which completes when the stream completes. + +## Example + +Given a stream of numbers we can count the numbers with the `count` operator + +Scala +: @@snip [SinkSpec.scala](/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala) { #count-operator-example } + +Java +: @@snip [SinkDocExamples.java](/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #count-operator-example } + +## Reactive Streams semantics + +@@@div { .callout } + +**completes** when upstream completes + +**backpressures** never (counting is a lightweight operation) + +**cancels** never + +@@@ + + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index c1cd3e21e0..aee2d94e1e 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -56,6 +56,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].| |Sink|@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy| |Sink|@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. | +|Sink|@ref[count](Sink/count.md)|Counts all incoming elements until upstream terminates.| |Sink|@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.| |Sink|@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| |Sink|@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.| @@ -433,6 +434,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [conflate](Source-or-Flow/conflate.md) * [conflateWithSeed](Source-or-Flow/conflateWithSeed.md) * [contramap](Flow/contramap.md) +* [count](Sink/count.md) * [cycle](Source/cycle.md) * [deflate](Compression/deflate.md) * [delay](Source-or-Flow/delay.md) diff --git a/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java index 30a6609b6b..bc41ab6909 100644 --- a/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java +++ b/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -52,6 +52,15 @@ public class SinkDocExamples { // #seq-operator-example } + static void countExample() { + // #count-operator-example + Source ints = Source.range(1, 10); + CompletionStage count = ints.runWith(Sink.count(), system); + count.thenAccept(System.out::println); + // 10 + // #count-operator-example + } + static void takeLastExample() { // #takeLast-operator-example // pair of (Name, GPA) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index f5c220841d..882cf0ccbf 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -34,6 +34,7 @@ import org.apache.pekko.stream.testkit.javadsl.TestSink; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.javadsl.TestKit; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -262,4 +263,10 @@ public class SinkTest extends StreamTest { boolean anyMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS); assertTrue(anyMatch); } + + @Test + public void sinkMustBeAbleToUseCount() { + CompletionStage cs = Source.range(1, 10).runWith(Sink.count(), system); + Assert.assertEquals(10, cs.toCompletableFuture().join().longValue()); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index 38c62edd73..481c2a2cee 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -324,6 +324,20 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } } + "The count sink" must { + "count the number of elements in the stream" in { + // #count-operator-example + val source = Source(1 to 10) + val result = source.runWith(Sink.count) + val count = result.futureValue + println(count) + // will print + // 10 + // #count-operator-example + assert(result.futureValue == 10) + } + } + "The foreach sink" must { "illustrate println" in { // #foreach diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index cd01348ce3..9c7c1d56e1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -154,6 +154,7 @@ import pekko.stream.Attributes._ val lastOptionSink = name("lastOptionSink") val takeLastSink = name("takeLastSink") val seqSink = name("seqSink") + val countSink = name("countSink") val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala new file mode 100644 index 0000000000..6a665061d5 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/CountSink.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import scala.concurrent.{ Future, Promise } + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.{ AbruptStageTerminationException, Attributes, Inlet, SinkShape } +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler } + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object CountSink + extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Long]] { + private val in = Inlet[Any]("seq.in") + override def shape: SinkShape[Any] = SinkShape.of(in) + override def toString: String = "CountSink" + override protected def initialAttributes: Attributes = DefaultAttributes.countSink + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Long]) = { + val promise = Promise[Long]() + object logic extends GraphStageLogic(shape) with InHandler { + private var counter: Long = 0L + override def preStart(): Unit = pull(in) + override def onPush(): Unit = { + counter += 1 + pull(in) + } + override def onUpstreamFinish(): Unit = { + promise.trySuccess(counter) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + promise.tryFailure(ex) + failStage(ex) + } + + override def postStop(): Unit = { + if (!promise.isCompleted) promise.failure(new AbruptStageTerminationException(this)) + } + + setHandler(in, this) + } + + (logic, promise.future) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 092ca7e35b..7894124d50 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -313,6 +313,27 @@ object Sink { scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContext.parasitic).asJava)) } + /** + * A `Sink` that counts all incoming elements until upstream terminates. + * + * Since upstream may be unbounded, consider using `Flow[T].take` or the stricter `Flow[T].limit` + * (and their variants) to ensure boundedness. The sink materializes into a `CompletionStage` of `Long` + * containing the total count of elements that passed through. + * + * '''Completes when''' upstream completes + * + * '''Backpressures when''' never (counting is a lightweight operation) + * + * '''Cancels when''' never + * + * @return a `Sink` that materializes to a `CompletionStage[Long]` with the element count + * @since 2.0.0 + * + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def count[In]: Sink[In, CompletionStage[java.lang.Long]] = new Sink( + scaladsl.Sink.count[In].mapMaterializedValue(_.asJava.asInstanceOf[CompletionStage[java.lang.Long]])) + /** * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index a58ec833f0..f5dc532eed 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -27,7 +27,7 @@ import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.GraphStages +import pekko.stream.impl.fusing.{ CountSink, GraphStages } import pekko.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } @@ -261,6 +261,26 @@ object Sink { */ def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]]) + /** + * A `Sink` that counts all incoming elements until upstream terminates. + * + * Since upstream may be unbounded, consider using `Flow[T].take` or the stricter `Flow[T].limit` + * (and their variants) to ensure boundedness. The sink materializes into a `Future` of `Long` + * containing the total count of elements that passed through. + * + * '''Completes when''' upstream completes + * + * '''Backpressures when''' never (counting is a lightweight operation) + * + * '''Cancels when''' never + * + * @return a `Sink` that materializes to a `Future[Long]` with the element count + * @since 2.0.0 + * + * See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]] + */ + def count[T]: Sink[T, Future[Long]] = Sink.fromGraph(CountSink) + /** * A `Sink` that keeps on collecting incoming elements until upstream terminates. * As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)