From a579679445e9d47eafff7cfde62a0c2bc7268ee2 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Tue, 23 Sep 2025 17:10:26 +0800 Subject: [PATCH] feat: Add Sink#source (#2250) * feat: Add Sink#source --- .../paradox/stream/operators/Sink/source.md | 29 +++++ .../main/paradox/stream/operators/index.md | 2 + ...blePublisherViaJavaFlowPublisherTest.scala | 1 + .../apache/pekko/stream/javadsl/SinkTest.java | 11 ++ .../stream/scaladsl/SourceSinkSpec.scala | 87 +++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../pekko/stream/impl/fusing/SourceSink.scala | 121 ++++++++++++++++++ .../apache/pekko/stream/javadsl/Sink.scala | 14 ++ .../apache/pekko/stream/scaladsl/Sink.scala | 17 ++- 9 files changed, 282 insertions(+), 1 deletion(-) create mode 100644 docs/src/main/paradox/stream/operators/Sink/source.md create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala diff --git a/docs/src/main/paradox/stream/operators/Sink/source.md b/docs/src/main/paradox/stream/operators/Sink/source.md new file mode 100644 index 0000000000..6dc6f751e9 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Sink/source.md @@ -0,0 +1,29 @@ +# Sink.source + +A `Sink` that materializes this `Sink` itself as a `Source`, the returning `Source` can only have one subscriber. + +@ref[Sink operators](../index.md#sink-operators) + +## Signature + +@apidoc[Sink.source](Sink$) { java="#source()" } +@apidoc[Sink.source](Sink$) { scala="#source()" } + + +## Description + +A `Sink` that materialize this `Sink` itself as a `Source`, the returning `Source` can only have one subscriber. + +Use `BroadcastHub.sink` if you need a `Source` that allows multiple subscribers. + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** When the materialized `Source` is cancelled or timeout with subscription. + +**backpressures** When the materialized `Source` backpressures or not ready to receive elements. + +@@@ + + diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index aee2d94e1e..da367ebad1 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -81,6 +81,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl |Sink|@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.| |Sink|@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.| |Sink|@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.| +|Sink|@ref[source](Sink/source.md)|A `Sink` that materializes this `Sink` itself as a `Source`, the returning `Source` can only have one subscriber.| |Sink|@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.| ## Additional Sink and Source converters @@ -575,6 +576,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [single](Source/single.md) * [sink](PubSub/sink.md) * [sliding](Source-or-Flow/sliding.md) +* [source](Sink/source.md) * [source](PubSub/source.md) * [splitAfter](Source-or-Flow/splitAfter.md) * [splitWhen](Source-or-Flow/splitWhen.md) diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala index 44706c2456..b81ca2c16b 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala @@ -18,6 +18,7 @@ import java.util.concurrent.{ Flow => JavaFlow } import org.apache.pekko import pekko.NotUsed import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source } + import org.reactivestreams._ class IterablePublisherViaJavaFlowPublisherTest extends PekkoPublisherVerification[Int] { diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java index 882cf0ccbf..c912e734aa 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SinkTest.java @@ -269,4 +269,15 @@ public class SinkTest extends StreamTest { CompletionStage cs = Source.range(1, 10).runWith(Sink.count(), system); Assert.assertEquals(10, cs.toCompletableFuture().join().longValue()); } + + @Test + public void mustBeAbleToUseSinkAsSource() throws Exception { + final List r = + Source.range(1, 10) + .runWith(Sink.source(), system) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(1, TimeUnit.SECONDS); + assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), r); + } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala new file mode 100644 index 0000000000..73f728bbdb --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceSinkSpec.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.{ Attributes, StreamSubscriptionTimeoutTerminationMode } +import pekko.stream.ActorAttributes.StreamSubscriptionTimeout +import pekko.stream.testkit.StreamSpec +import pekko.stream.testkit.scaladsl.{ TestSink, TestSource } + +class SourceSinkSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) { + + "Sink.toSeq" must { + "Can be used as a Source with run twice" in { + val s = Source(1 to 6).runWith(Sink.source) + s.runWith(Sink.seq).futureValue should be(1 to 6) + } + + "Can complete when upstream completes without elements" in { + val s = Source.empty.runWith(Sink.source) + s.runWith(Sink.seq).futureValue should be(Nil) + } + + "Can cancel when down stream cancel" in { + val (pub, source) = TestSource.probe[Int] + .toMat(Sink.source)(Keep.both) + .run() + val sub = source.runWith(TestSink.probe[Int]) + pub.ensureSubscription() + sub.ensureSubscription() + sub.cancel() + pub.expectCancellation() + } + + "Can timeout when no subscription" in { + import scala.concurrent.duration._ + val (pub, source) = TestSource.probe[Int] + .toMat(Sink.source)(Keep.both) + .addAttributes(Attributes( + StreamSubscriptionTimeout( + 2.seconds, + StreamSubscriptionTimeoutTerminationMode.cancel + ) + )) + .run() + pub.expectCancellation() + Thread.sleep(1000) // wait a bit + val sub = source.runWith(TestSink.probe) + sub.expectSubscription() + sub.expectError() + } + + "Can backpressure" in { + Source.iterate(1)(_ => true, _ + 1) + .runWith(Sink.source).runWith(TestSink.probe[Int]) + .request(3) + .expectNext(1, 2, 3) + .request(2) + .expectNext(4, 5) + .cancel() + } + + "Can use with mapMaterializedValue" in { + val sink = Sink.source[Int].mapMaterializedValue(_.runWith(Sink.seq)) + Source(1 to 5) + .runWith(sink) + .futureValue should be(1 to 5) + } + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 9c7c1d56e1..58ea8b5bd5 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -156,6 +156,7 @@ import pekko.stream.Attributes._ val seqSink = name("seqSink") val countSink = name("countSink") val publisherSink = name("publisherSink") + val sourceSink = name("sourceSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") val neverSink = name("neverSink") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala new file mode 100644 index 0000000000..db58cc93e6 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/SourceSink.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import org.apache.pekko.stream.impl.Stages.DefaultAttributes +import pekko.NotUsed +import pekko.annotation.InternalApi +import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, StreamSubscriptionTimeoutTerminationMode } +import pekko.stream.ActorAttributes.StreamSubscriptionTimeout +import pekko.stream.scaladsl.Source +import pekko.stream.stage.{ + GraphStageLogic, + GraphStageWithMaterializedValue, + InHandler, + OutHandler, + TimerGraphStageLogic +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object SourceSink + extends GraphStageWithMaterializedValue[SinkShape[Any], Source[Any, NotUsed]] { + private val SubscriptionTimerKey = "SubstreamSubscriptionTimerKey" + private val in = Inlet[Any]("sourceSink.in") + override val shape = SinkShape(in) + + override def toString: String = "SourceSink" + override protected def initialAttributes: Attributes = DefaultAttributes.sourceSink + + override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes): (GraphStageLogic, Source[Any, NotUsed]) = { + + /** + * NOTE: in the current implementation of Pekko Stream, + * We have to materialization twice to do the piping, which means, even we can treat the Sink as a Source. + * + * In an idea word this stage should be purged out by the materializer optimization, + * and we can directly connect the upstream to the downstream. + */ + object logic extends TimerGraphStageLogic(shape) with InHandler with OutHandler { self => + val sinkSource = new SubSourceOutlet[Any]("sinkSource") + + private def subHandler(): OutHandler = new OutHandler { + override def onPull(): Unit = { + setKeepGoing(false) + cancelTimer(SubscriptionTimerKey) + pull(in) + sinkSource.setHandler(self) + } + override def onDownstreamFinish(cause: Throwable): Unit = self.onDownstreamFinish(cause) + } + + override def preStart(): Unit = { + sinkSource.setHandler(subHandler()) + setKeepGoing(true) + val timeout = inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout + scheduleOnce(SubscriptionTimerKey, timeout) + } + + override protected def onTimer(timerKey: Any): Unit = { + val materializer = interpreter.materializer + val StreamSubscriptionTimeout(timeout, mode) = + inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout] + + mode match { + case StreamSubscriptionTimeoutTerminationMode.CancelTermination => + sinkSource.timeout(timeout) + if (sinkSource.isClosed) + completeStage() + case StreamSubscriptionTimeoutTerminationMode.NoopTermination => + // do nothing + case StreamSubscriptionTimeoutTerminationMode.WarnTermination => + materializer.logger.warning( + "Substream subscription timeout triggered after {} in SourceSink.", + timeout) + } + } + + override def onPush(): Unit = sinkSource.push(grab(in)) + override def onPull(): Unit = pull(in) + + override def onUpstreamFinish(): Unit = { + if (!sinkSource.isClosed) { + sinkSource.complete() + } + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = if (!sinkSource.isClosed) { + sinkSource.fail(ex) + completeStage() + } else failStage(ex) + + override def onDownstreamFinish(cause: Throwable): Unit = { + // cancel upstream only if the substream was cancelled + if (!isClosed(in)) cancelStage(cause) + } + + setHandler(in, this) + } + + (logic, Source.fromGraph(logic.sinkSource.source)) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 7894124d50..8cdfc87b13 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -212,6 +212,20 @@ object Sink { def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] = new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT)) + /** + * A `Sink` that materializes this `Sink` itself as a `Source`. + * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. + * + * Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple subscribers. + * + * Note: even if the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary + * between them; performance may be improved in the future. + * + * @since 2.0.0 + */ + def source[T](): Sink[T, Source[T, NotUsed]] = new Sink(scaladsl.Sink.source[T]) + .mapMaterializedValue(src => src.asJava) + /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized * into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index f5dc532eed..96d7fe622b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -27,7 +27,7 @@ import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes -import pekko.stream.impl.fusing.{ CountSink, GraphStages } +import pekko.stream.impl.fusing.{ CountSink, GraphStages, SourceSink } import pekko.stream.stage._ import org.reactivestreams.{ Publisher, Subscriber } @@ -312,6 +312,21 @@ object Sink { if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + /** + * A `Sink` that materializes this `Sink` itself as a `Source`. + * The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`. + * + * Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple subscribers. + * + * Note: even if the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary + * between them; performance may be improved in the future. + * + * @since 2.0.0 + */ + def source[T]: Sink[T, Source[T, NotUsed]] = _sourceSink.asInstanceOf[Sink[T, Source[T, NotUsed]]] + + private[this] val _sourceSink = fromGraph(SourceSink) + /** * A `Sink` that will consume the stream and discard the elements. */