parent
391b71a3d0
commit
a579679445
9 changed files with 282 additions and 1 deletions
29
docs/src/main/paradox/stream/operators/Sink/source.md
Normal file
29
docs/src/main/paradox/stream/operators/Sink/source.md
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
# Sink.source
|
||||||
|
|
||||||
|
A `Sink` that materializes this `Sink` itself as a `Source`, the returning `Source` can only have one subscriber.
|
||||||
|
|
||||||
|
@ref[Sink operators](../index.md#sink-operators)
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@apidoc[Sink.source](Sink$) { java="#source()" }
|
||||||
|
@apidoc[Sink.source](Sink$) { scala="#source()" }
|
||||||
|
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
A `Sink` that materialize this `Sink` itself as a `Source`, the returning `Source` can only have one subscriber.
|
||||||
|
|
||||||
|
Use `BroadcastHub.sink` if you need a `Source` that allows multiple subscribers.
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**cancels** When the materialized `Source` is cancelled or timeout with subscription.
|
||||||
|
|
||||||
|
**backpressures** When the materialized `Source` backpressures or not ready to receive elements.
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -81,6 +81,7 @@ These built-in sinks are available from @scala[`org.apache.pekko.stream.scaladsl
|
||||||
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
|
|Sink|<a name="queue"></a>@ref[queue](Sink/queue.md)|Materialize a `SinkQueue` that can be pulled to trigger demand through the sink.|
|
||||||
|Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.|
|
|Sink|<a name="reduce"></a>@ref[reduce](Sink/reduce.md)|Apply a reduction function on the incoming elements and pass the result to the next invocation.|
|
||||||
|Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.|
|
|Sink|<a name="seq"></a>@ref[seq](Sink/seq.md)|Collect values emitted from the stream into a collection.|
|
||||||
|
|Sink|<a name="source"></a>@ref[source](Sink/source.md)|A `Sink` that materializes this `Sink` itself as a `Source`, the returning `Source` can only have one subscriber.|
|
||||||
|Sink|<a name="takelast"></a>@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.|
|
|Sink|<a name="takelast"></a>@ref[takeLast](Sink/takeLast.md)|Collect the last `n` values emitted from the stream into a collection.|
|
||||||
|
|
||||||
## Additional Sink and Source converters
|
## Additional Sink and Source converters
|
||||||
|
|
@ -575,6 +576,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
* [single](Source/single.md)
|
* [single](Source/single.md)
|
||||||
* [sink](PubSub/sink.md)
|
* [sink](PubSub/sink.md)
|
||||||
* [sliding](Source-or-Flow/sliding.md)
|
* [sliding](Source-or-Flow/sliding.md)
|
||||||
|
* [source](Sink/source.md)
|
||||||
* [source](PubSub/source.md)
|
* [source](PubSub/source.md)
|
||||||
* [splitAfter](Source-or-Flow/splitAfter.md)
|
* [splitAfter](Source-or-Flow/splitAfter.md)
|
||||||
* [splitWhen](Source-or-Flow/splitWhen.md)
|
* [splitWhen](Source-or-Flow/splitWhen.md)
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ import java.util.concurrent.{ Flow => JavaFlow }
|
||||||
import org.apache.pekko
|
import org.apache.pekko
|
||||||
import pekko.NotUsed
|
import pekko.NotUsed
|
||||||
import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source }
|
import pekko.stream.scaladsl.{ JavaFlowSupport, Sink, Source }
|
||||||
|
|
||||||
import org.reactivestreams._
|
import org.reactivestreams._
|
||||||
|
|
||||||
class IterablePublisherViaJavaFlowPublisherTest extends PekkoPublisherVerification[Int] {
|
class IterablePublisherViaJavaFlowPublisherTest extends PekkoPublisherVerification[Int] {
|
||||||
|
|
|
||||||
|
|
@ -269,4 +269,15 @@ public class SinkTest extends StreamTest {
|
||||||
CompletionStage<Long> cs = Source.range(1, 10).runWith(Sink.count(), system);
|
CompletionStage<Long> cs = Source.range(1, 10).runWith(Sink.count(), system);
|
||||||
Assert.assertEquals(10, cs.toCompletableFuture().join().longValue());
|
Assert.assertEquals(10, cs.toCompletableFuture().join().longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToUseSinkAsSource() throws Exception {
|
||||||
|
final List<Integer> r =
|
||||||
|
Source.range(1, 10)
|
||||||
|
.runWith(Sink.source(), system)
|
||||||
|
.runWith(Sink.seq(), system)
|
||||||
|
.toCompletableFuture()
|
||||||
|
.get(1, TimeUnit.SECONDS);
|
||||||
|
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), r);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.stream.scaladsl
|
||||||
|
|
||||||
|
import org.apache.pekko
|
||||||
|
import pekko.stream.{ Attributes, StreamSubscriptionTimeoutTerminationMode }
|
||||||
|
import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
|
||||||
|
import pekko.stream.testkit.StreamSpec
|
||||||
|
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
|
||||||
|
|
||||||
|
class SourceSinkSpec extends StreamSpec("""
|
||||||
|
pekko.stream.materializer.initial-input-buffer-size = 2
|
||||||
|
""") {
|
||||||
|
|
||||||
|
"Sink.toSeq" must {
|
||||||
|
"Can be used as a Source with run twice" in {
|
||||||
|
val s = Source(1 to 6).runWith(Sink.source)
|
||||||
|
s.runWith(Sink.seq).futureValue should be(1 to 6)
|
||||||
|
}
|
||||||
|
|
||||||
|
"Can complete when upstream completes without elements" in {
|
||||||
|
val s = Source.empty.runWith(Sink.source)
|
||||||
|
s.runWith(Sink.seq).futureValue should be(Nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
"Can cancel when down stream cancel" in {
|
||||||
|
val (pub, source) = TestSource.probe[Int]
|
||||||
|
.toMat(Sink.source)(Keep.both)
|
||||||
|
.run()
|
||||||
|
val sub = source.runWith(TestSink.probe[Int])
|
||||||
|
pub.ensureSubscription()
|
||||||
|
sub.ensureSubscription()
|
||||||
|
sub.cancel()
|
||||||
|
pub.expectCancellation()
|
||||||
|
}
|
||||||
|
|
||||||
|
"Can timeout when no subscription" in {
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
val (pub, source) = TestSource.probe[Int]
|
||||||
|
.toMat(Sink.source)(Keep.both)
|
||||||
|
.addAttributes(Attributes(
|
||||||
|
StreamSubscriptionTimeout(
|
||||||
|
2.seconds,
|
||||||
|
StreamSubscriptionTimeoutTerminationMode.cancel
|
||||||
|
)
|
||||||
|
))
|
||||||
|
.run()
|
||||||
|
pub.expectCancellation()
|
||||||
|
Thread.sleep(1000) // wait a bit
|
||||||
|
val sub = source.runWith(TestSink.probe)
|
||||||
|
sub.expectSubscription()
|
||||||
|
sub.expectError()
|
||||||
|
}
|
||||||
|
|
||||||
|
"Can backpressure" in {
|
||||||
|
Source.iterate(1)(_ => true, _ + 1)
|
||||||
|
.runWith(Sink.source).runWith(TestSink.probe[Int])
|
||||||
|
.request(3)
|
||||||
|
.expectNext(1, 2, 3)
|
||||||
|
.request(2)
|
||||||
|
.expectNext(4, 5)
|
||||||
|
.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
"Can use with mapMaterializedValue" in {
|
||||||
|
val sink = Sink.source[Int].mapMaterializedValue(_.runWith(Sink.seq))
|
||||||
|
Source(1 to 5)
|
||||||
|
.runWith(sink)
|
||||||
|
.futureValue should be(1 to 5)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -156,6 +156,7 @@ import pekko.stream.Attributes._
|
||||||
val seqSink = name("seqSink")
|
val seqSink = name("seqSink")
|
||||||
val countSink = name("countSink")
|
val countSink = name("countSink")
|
||||||
val publisherSink = name("publisherSink")
|
val publisherSink = name("publisherSink")
|
||||||
|
val sourceSink = name("sourceSink")
|
||||||
val fanoutPublisherSink = name("fanoutPublisherSink")
|
val fanoutPublisherSink = name("fanoutPublisherSink")
|
||||||
val ignoreSink = name("ignoreSink")
|
val ignoreSink = name("ignoreSink")
|
||||||
val neverSink = name("neverSink")
|
val neverSink = name("neverSink")
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,121 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.stream.impl.fusing
|
||||||
|
|
||||||
|
import org.apache.pekko
|
||||||
|
import org.apache.pekko.stream.impl.Stages.DefaultAttributes
|
||||||
|
import pekko.NotUsed
|
||||||
|
import pekko.annotation.InternalApi
|
||||||
|
import pekko.stream.{ ActorAttributes, Attributes, Inlet, SinkShape, StreamSubscriptionTimeoutTerminationMode }
|
||||||
|
import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
|
||||||
|
import pekko.stream.scaladsl.Source
|
||||||
|
import pekko.stream.stage.{
|
||||||
|
GraphStageLogic,
|
||||||
|
GraphStageWithMaterializedValue,
|
||||||
|
InHandler,
|
||||||
|
OutHandler,
|
||||||
|
TimerGraphStageLogic
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi private[pekko] object SourceSink
|
||||||
|
extends GraphStageWithMaterializedValue[SinkShape[Any], Source[Any, NotUsed]] {
|
||||||
|
private val SubscriptionTimerKey = "SubstreamSubscriptionTimerKey"
|
||||||
|
private val in = Inlet[Any]("sourceSink.in")
|
||||||
|
override val shape = SinkShape(in)
|
||||||
|
|
||||||
|
override def toString: String = "SourceSink"
|
||||||
|
override protected def initialAttributes: Attributes = DefaultAttributes.sourceSink
|
||||||
|
|
||||||
|
override def createLogicAndMaterializedValue(
|
||||||
|
inheritedAttributes: Attributes): (GraphStageLogic, Source[Any, NotUsed]) = {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* NOTE: in the current implementation of Pekko Stream,
|
||||||
|
* We have to materialization twice to do the piping, which means, even we can treat the Sink as a Source.
|
||||||
|
*
|
||||||
|
* In an idea word this stage should be purged out by the materializer optimization,
|
||||||
|
* and we can directly connect the upstream to the downstream.
|
||||||
|
*/
|
||||||
|
object logic extends TimerGraphStageLogic(shape) with InHandler with OutHandler { self =>
|
||||||
|
val sinkSource = new SubSourceOutlet[Any]("sinkSource")
|
||||||
|
|
||||||
|
private def subHandler(): OutHandler = new OutHandler {
|
||||||
|
override def onPull(): Unit = {
|
||||||
|
setKeepGoing(false)
|
||||||
|
cancelTimer(SubscriptionTimerKey)
|
||||||
|
pull(in)
|
||||||
|
sinkSource.setHandler(self)
|
||||||
|
}
|
||||||
|
override def onDownstreamFinish(cause: Throwable): Unit = self.onDownstreamFinish(cause)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def preStart(): Unit = {
|
||||||
|
sinkSource.setHandler(subHandler())
|
||||||
|
setKeepGoing(true)
|
||||||
|
val timeout = inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
|
||||||
|
scheduleOnce(SubscriptionTimerKey, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
override protected def onTimer(timerKey: Any): Unit = {
|
||||||
|
val materializer = interpreter.materializer
|
||||||
|
val StreamSubscriptionTimeout(timeout, mode) =
|
||||||
|
inheritedAttributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout]
|
||||||
|
|
||||||
|
mode match {
|
||||||
|
case StreamSubscriptionTimeoutTerminationMode.CancelTermination =>
|
||||||
|
sinkSource.timeout(timeout)
|
||||||
|
if (sinkSource.isClosed)
|
||||||
|
completeStage()
|
||||||
|
case StreamSubscriptionTimeoutTerminationMode.NoopTermination =>
|
||||||
|
// do nothing
|
||||||
|
case StreamSubscriptionTimeoutTerminationMode.WarnTermination =>
|
||||||
|
materializer.logger.warning(
|
||||||
|
"Substream subscription timeout triggered after {} in SourceSink.",
|
||||||
|
timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onPush(): Unit = sinkSource.push(grab(in))
|
||||||
|
override def onPull(): Unit = pull(in)
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit = {
|
||||||
|
if (!sinkSource.isClosed) {
|
||||||
|
sinkSource.complete()
|
||||||
|
}
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFailure(ex: Throwable): Unit = if (!sinkSource.isClosed) {
|
||||||
|
sinkSource.fail(ex)
|
||||||
|
completeStage()
|
||||||
|
} else failStage(ex)
|
||||||
|
|
||||||
|
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||||
|
// cancel upstream only if the substream was cancelled
|
||||||
|
if (!isClosed(in)) cancelStage(cause)
|
||||||
|
}
|
||||||
|
|
||||||
|
setHandler(in, this)
|
||||||
|
}
|
||||||
|
|
||||||
|
(logic, Source.fromGraph(logic.sinkSource.source))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -212,6 +212,20 @@ object Sink {
|
||||||
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
|
def asPublisher[T](fanout: AsPublisher): Sink[T, Publisher[T]] =
|
||||||
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
|
new Sink(scaladsl.Sink.asPublisher(fanout == AsPublisher.WITH_FANOUT))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `Sink` that materializes this `Sink` itself as a `Source`.
|
||||||
|
* The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`.
|
||||||
|
*
|
||||||
|
* Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple subscribers.
|
||||||
|
*
|
||||||
|
* Note: even if the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary
|
||||||
|
* between them; performance may be improved in the future.
|
||||||
|
*
|
||||||
|
* @since 2.0.0
|
||||||
|
*/
|
||||||
|
def source[T](): Sink[T, Source[T, NotUsed]] = new Sink(scaladsl.Sink.source[T])
|
||||||
|
.mapMaterializedValue(src => src.asJava)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
|
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
|
||||||
* into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the
|
* into a [[java.util.concurrent.CompletionStage]] which will be completed with `Success` when reaching the
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import pekko.annotation.InternalApi
|
||||||
import pekko.stream._
|
import pekko.stream._
|
||||||
import pekko.stream.impl._
|
import pekko.stream.impl._
|
||||||
import pekko.stream.impl.Stages.DefaultAttributes
|
import pekko.stream.impl.Stages.DefaultAttributes
|
||||||
import pekko.stream.impl.fusing.{ CountSink, GraphStages }
|
import pekko.stream.impl.fusing.{ CountSink, GraphStages, SourceSink }
|
||||||
import pekko.stream.stage._
|
import pekko.stream.stage._
|
||||||
|
|
||||||
import org.reactivestreams.{ Publisher, Subscriber }
|
import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
|
|
@ -312,6 +312,21 @@ object Sink {
|
||||||
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
|
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
|
||||||
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
|
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `Sink` that materializes this `Sink` itself as a `Source`.
|
||||||
|
* The returned `Source` is a "live view" onto the `Sink` and only supports a single `Subscriber`.
|
||||||
|
*
|
||||||
|
* Use [[BroadcastHub#sink]] if you need a `Source` that allows multiple subscribers.
|
||||||
|
*
|
||||||
|
* Note: even if the `Source` is directly connected to the `Sink`, there is still an asynchronous boundary
|
||||||
|
* between them; performance may be improved in the future.
|
||||||
|
*
|
||||||
|
* @since 2.0.0
|
||||||
|
*/
|
||||||
|
def source[T]: Sink[T, Source[T, NotUsed]] = _sourceSink.asInstanceOf[Sink[T, Source[T, NotUsed]]]
|
||||||
|
|
||||||
|
private[this] val _sourceSink = fromGraph(SourceSink)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `Sink` that will consume the stream and discard the elements.
|
* A `Sink` that will consume the stream and discard the elements.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue