feat: Add Sink#count operator. (#2244)
* feat: Add Sink#count operator. * Use .asInstanceOf for better performance --------- Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com>
This commit is contained in:
parent
57812486e1
commit
a1ade992ff
9 changed files with 180 additions and 1 deletions
39
docs/src/main/paradox/stream/operators/Sink/count.md
Normal file
39
docs/src/main/paradox/stream/operators/Sink/count.md
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
# Sink.count
|
||||
|
||||
Counts all incoming elements until upstream terminates.
|
||||
|
||||
@ref[Sink operators](../index.md#sink-operators)
|
||||
|
||||
## Signature
|
||||
|
||||
@apidoc[Sink.count](Sink$) { scala="#count[T]:org.apache.pekko.stream.scaladsl.Sink[T,scala.concurrent.Future[Long]]" java="#count()" }
|
||||
|
||||
|
||||
## Description
|
||||
|
||||
Counts values emitted from the stream, the count is available through a @scala[`Future`] @java[`CompletionStage`] or
|
||||
which completes when the stream completes.
|
||||
|
||||
## Example
|
||||
|
||||
Given a stream of numbers we can count the numbers with the `count` operator
|
||||
|
||||
Scala
|
||||
: @@snip [SinkSpec.scala](/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala) { #count-operator-example }
|
||||
|
||||
Java
|
||||
: @@snip [SinkDocExamples.java](/docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #count-operator-example }
|
||||
|
||||
## Reactive Streams semantics
|
||||
|
||||
@@@div { .callout }
|
||||
|
||||
**completes** when upstream completes
|
||||
|
||||
**backpressures** never (counting is a lightweight operation)
|
||||
|
||||
**cancels** never
|
||||
|
||||
@@@
|
||||
|
||||
|
||||
|
|
@ -56,6 +56,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|
|||
|Sink|<a name="collection"></a>@ref[collection](Sink/collection.md)|@scala[Collect all values emitted from the stream into a collection.]@java[Operator only available in the Scala API. The closest operator in the Java API is @ref[`Sink.seq`](Sink/seq.md)].|
|
||||
|Sink|<a name="combine"></a>@ref[combine](Sink/combine.md)|Combine several sinks into one using a user specified strategy|
|
||||
|Sink|<a name="completionstagesink"></a>@ref[completionStageSink](Sink/completionStageSink.md)|Streams the elements to the given future sink once it successfully completes. |
|
||||
|Sink|<a name="count"></a>@ref[count](Sink/count.md)|Counts all incoming elements until upstream terminates.|
|
||||
|Sink|<a name="exists"></a>@ref[exists](Sink/exists.md)|A `Sink` that will test the given predicate `p` for every received element and completes with the result.|
|
||||
|Sink|<a name="fold"></a>@ref[fold](Sink/fold.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
||||
|Sink|<a name="foldwhile"></a>@ref[foldWhile](Sink/foldWhile.md)|Fold over emitted elements with a function, where each invocation will get the new element and the result from the previous fold invocation.|
|
||||
|
|
@ -433,6 +434,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
|||
* [conflate](Source-or-Flow/conflate.md)
|
||||
* [conflateWithSeed](Source-or-Flow/conflateWithSeed.md)
|
||||
* [contramap](Flow/contramap.md)
|
||||
* [count](Sink/count.md)
|
||||
* [cycle](Source/cycle.md)
|
||||
* [deflate](Compression/deflate.md)
|
||||
* [delay](Source-or-Flow/delay.md)
|
||||
|
|
|
|||
|
|
@ -52,6 +52,15 @@ public class SinkDocExamples {
|
|||
// #seq-operator-example
|
||||
}
|
||||
|
||||
static void countExample() {
|
||||
// #count-operator-example
|
||||
Source<Integer, NotUsed> ints = Source.range(1, 10);
|
||||
CompletionStage<Long> count = ints.runWith(Sink.count(), system);
|
||||
count.thenAccept(System.out::println);
|
||||
// 10
|
||||
// #count-operator-example
|
||||
}
|
||||
|
||||
static void takeLastExample() {
|
||||
// #takeLast-operator-example
|
||||
// pair of (Name, GPA)
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import org.apache.pekko.stream.testkit.javadsl.TestSink;
|
|||
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
|
||||
import org.apache.pekko.testkit.PekkoSpec;
|
||||
import org.apache.pekko.testkit.javadsl.TestKit;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
|
@ -262,4 +263,10 @@ public class SinkTest extends StreamTest {
|
|||
boolean anyMatch = cs.toCompletableFuture().get(100, TimeUnit.MILLISECONDS);
|
||||
assertTrue(anyMatch);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sinkMustBeAbleToUseCount() {
|
||||
CompletionStage<Long> cs = Source.range(1, 10).runWith(Sink.count(), system);
|
||||
Assert.assertEquals(10, cs.toCompletableFuture().join().longValue());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -324,6 +324,20 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
|
|||
}
|
||||
}
|
||||
|
||||
"The count sink" must {
|
||||
"count the number of elements in the stream" in {
|
||||
// #count-operator-example
|
||||
val source = Source(1 to 10)
|
||||
val result = source.runWith(Sink.count)
|
||||
val count = result.futureValue
|
||||
println(count)
|
||||
// will print
|
||||
// 10
|
||||
// #count-operator-example
|
||||
assert(result.futureValue == 10)
|
||||
}
|
||||
}
|
||||
|
||||
"The foreach sink" must {
|
||||
"illustrate println" in {
|
||||
// #foreach
|
||||
|
|
|
|||
|
|
@ -154,6 +154,7 @@ import pekko.stream.Attributes._
|
|||
val lastOptionSink = name("lastOptionSink")
|
||||
val takeLastSink = name("takeLastSink")
|
||||
val seqSink = name("seqSink")
|
||||
val countSink = name("countSink")
|
||||
val publisherSink = name("publisherSink")
|
||||
val fanoutPublisherSink = name("fanoutPublisherSink")
|
||||
val ignoreSink = name("ignoreSink")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.pekko.stream.impl.fusing
|
||||
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.annotation.InternalApi
|
||||
import pekko.stream.{ AbruptStageTerminationException, Attributes, Inlet, SinkShape }
|
||||
import pekko.stream.impl.Stages.DefaultAttributes
|
||||
import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, InHandler }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[pekko] object CountSink
|
||||
extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Long]] {
|
||||
private val in = Inlet[Any]("seq.in")
|
||||
override def shape: SinkShape[Any] = SinkShape.of(in)
|
||||
override def toString: String = "CountSink"
|
||||
override protected def initialAttributes: Attributes = DefaultAttributes.countSink
|
||||
|
||||
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Long]) = {
|
||||
val promise = Promise[Long]()
|
||||
object logic extends GraphStageLogic(shape) with InHandler {
|
||||
private var counter: Long = 0L
|
||||
override def preStart(): Unit = pull(in)
|
||||
override def onPush(): Unit = {
|
||||
counter += 1
|
||||
pull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
promise.trySuccess(counter)
|
||||
completeStage()
|
||||
}
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
promise.tryFailure(ex)
|
||||
failStage(ex)
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
if (!promise.isCompleted) promise.failure(new AbruptStageTerminationException(this))
|
||||
}
|
||||
|
||||
setHandler(in, this)
|
||||
}
|
||||
|
||||
(logic, promise.future)
|
||||
}
|
||||
}
|
||||
|
|
@ -313,6 +313,27 @@ object Sink {
|
|||
scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContext.parasitic).asJava))
|
||||
}
|
||||
|
||||
/**
|
||||
* A `Sink` that counts all incoming elements until upstream terminates.
|
||||
*
|
||||
* Since upstream may be unbounded, consider using `Flow[T].take` or the stricter `Flow[T].limit`
|
||||
* (and their variants) to ensure boundedness. The sink materializes into a `CompletionStage` of `Long`
|
||||
* containing the total count of elements that passed through.
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Backpressures when''' never (counting is a lightweight operation)
|
||||
*
|
||||
* '''Cancels when''' never
|
||||
*
|
||||
* @return a `Sink` that materializes to a `CompletionStage[Long]` with the element count
|
||||
* @since 2.0.0
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def count[In]: Sink[In, CompletionStage[java.lang.Long]] = new Sink(
|
||||
scaladsl.Sink.count[In].mapMaterializedValue(_.asJava.asInstanceOf[CompletionStage[java.lang.Long]]))
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import pekko.annotation.InternalApi
|
|||
import pekko.stream._
|
||||
import pekko.stream.impl._
|
||||
import pekko.stream.impl.Stages.DefaultAttributes
|
||||
import pekko.stream.impl.fusing.GraphStages
|
||||
import pekko.stream.impl.fusing.{ CountSink, GraphStages }
|
||||
import pekko.stream.stage._
|
||||
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
|
@ -261,6 +261,26 @@ object Sink {
|
|||
*/
|
||||
def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]])
|
||||
|
||||
/**
|
||||
* A `Sink` that counts all incoming elements until upstream terminates.
|
||||
*
|
||||
* Since upstream may be unbounded, consider using `Flow[T].take` or the stricter `Flow[T].limit`
|
||||
* (and their variants) to ensure boundedness. The sink materializes into a `Future` of `Long`
|
||||
* containing the total count of elements that passed through.
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Backpressures when''' never (counting is a lightweight operation)
|
||||
*
|
||||
* '''Cancels when''' never
|
||||
*
|
||||
* @return a `Sink` that materializes to a `Future[Long]` with the element count
|
||||
* @since 2.0.0
|
||||
*
|
||||
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
|
||||
*/
|
||||
def count[T]: Sink[T, Future[Long]] = Sink.fromGraph(CountSink)
|
||||
|
||||
/**
|
||||
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
|
||||
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue