=str #18821 fix Sink.queue termination
This commit is contained in:
parent
0fb6654f4f
commit
64387583ad
8 changed files with 141 additions and 161 deletions
|
|
@ -4,16 +4,16 @@
|
||||||
package akka.stream.scaladsl
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
import akka.actor.Status
|
import akka.actor.Status
|
||||||
import akka.pattern.{ AskTimeoutException, pipe }
|
import akka.pattern.pipe
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.ActorMaterializer
|
||||||
import akka.stream.testkit.Utils._
|
import akka.stream.testkit.Utils._
|
||||||
import akka.stream.testkit.{ AkkaSpec, _ }
|
import akka.stream.testkit.{ AkkaSpec, _ }
|
||||||
|
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.{ Await, Future }
|
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
class AcknowledgeSinkSpec extends AkkaSpec {
|
class QueueSinkSpec extends AkkaSpec {
|
||||||
implicit val ec = system.dispatcher
|
implicit val ec = system.dispatcher
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
|
@ -21,11 +21,11 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
||||||
|
|
||||||
val noMsgTimeout = 300.millis
|
val noMsgTimeout = 300.millis
|
||||||
|
|
||||||
"An AcknowledgeSink" must {
|
"An QueueSinkSpec" must {
|
||||||
|
|
||||||
"send the elements as result of future" in assertAllStagesStopped {
|
"send the elements as result of future" in assertAllStagesStopped {
|
||||||
val expected = List(Some(1), Some(2), Some(3), None)
|
val expected = List(Some(1), Some(2), Some(3), None)
|
||||||
val queue = Source(expected.flatten).runWith(Sink.queue(3))
|
val queue = Source(expected.flatten).runWith(Sink.queue())
|
||||||
expected foreach { v ⇒
|
expected foreach { v ⇒
|
||||||
queue.pull() pipeTo testActor
|
queue.pull() pipeTo testActor
|
||||||
expectMsg(v)
|
expectMsg(v)
|
||||||
|
|
@ -34,7 +34,7 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
||||||
|
|
||||||
"allow to have only one future waiting for result in each point of time" in assertAllStagesStopped {
|
"allow to have only one future waiting for result in each point of time" in assertAllStagesStopped {
|
||||||
val probe = TestPublisher.manualProbe[Int]()
|
val probe = TestPublisher.manualProbe[Int]()
|
||||||
val queue = Source(probe).runWith(Sink.queue(3))
|
val queue = Source(probe).runWith(Sink.queue())
|
||||||
val sub = probe.expectSubscription()
|
val sub = probe.expectSubscription()
|
||||||
val future = queue.pull()
|
val future = queue.pull()
|
||||||
val future2 = queue.pull()
|
val future2 = queue.pull()
|
||||||
|
|
@ -45,11 +45,12 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
||||||
expectMsg(Some(1))
|
expectMsg(Some(1))
|
||||||
|
|
||||||
sub.sendComplete()
|
sub.sendComplete()
|
||||||
|
queue.pull()
|
||||||
}
|
}
|
||||||
|
|
||||||
"wait for next element from upstream" in assertAllStagesStopped {
|
"wait for next element from upstream" in assertAllStagesStopped {
|
||||||
val probe = TestPublisher.manualProbe[Int]()
|
val probe = TestPublisher.manualProbe[Int]()
|
||||||
val queue = Source(probe).runWith(Sink.queue(3))
|
val queue = Source(probe).runWith(Sink.queue())
|
||||||
val sub = probe.expectSubscription()
|
val sub = probe.expectSubscription()
|
||||||
|
|
||||||
queue.pull().pipeTo(testActor)
|
queue.pull().pipeTo(testActor)
|
||||||
|
|
@ -58,11 +59,12 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
||||||
sub.sendNext(1)
|
sub.sendNext(1)
|
||||||
expectMsg(Some(1))
|
expectMsg(Some(1))
|
||||||
sub.sendComplete()
|
sub.sendComplete()
|
||||||
|
queue.pull()
|
||||||
}
|
}
|
||||||
|
|
||||||
"fail future on stream failure" in assertAllStagesStopped {
|
"fail future on stream failure" in assertAllStagesStopped {
|
||||||
val probe = TestPublisher.manualProbe[Int]()
|
val probe = TestPublisher.manualProbe[Int]()
|
||||||
val queue = Source(probe).runWith(Sink.queue(3))
|
val queue = Source(probe).runWith(Sink.queue())
|
||||||
val sub = probe.expectSubscription()
|
val sub = probe.expectSubscription()
|
||||||
|
|
||||||
queue.pull().pipeTo(testActor)
|
queue.pull().pipeTo(testActor)
|
||||||
|
|
@ -74,16 +76,17 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
||||||
|
|
||||||
"fail future when stream failed" in assertAllStagesStopped {
|
"fail future when stream failed" in assertAllStagesStopped {
|
||||||
val probe = TestPublisher.manualProbe[Int]()
|
val probe = TestPublisher.manualProbe[Int]()
|
||||||
val queue = Source(probe).runWith(Sink.queue(3, 100.millis))
|
|
||||||
val sub = probe.expectSubscription()
|
|
||||||
sub.sendError(ex) // potential race condition
|
|
||||||
|
|
||||||
an[AskTimeoutException] shouldBe thrownBy { Await.result(queue.pull(), 1.second) }
|
val queue = Source(probe).runWith(Sink.queue())
|
||||||
|
val sub = probe.expectSubscription()
|
||||||
|
sub.sendError(ex)
|
||||||
|
|
||||||
|
the[Exception] thrownBy { Await.result(queue.pull(), 300.millis) } should be(ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
"timeout future when stream cannot provide data" in assertAllStagesStopped {
|
"timeout future when stream cannot provide data" in assertAllStagesStopped {
|
||||||
val probe = TestPublisher.manualProbe[Int]()
|
val probe = TestPublisher.manualProbe[Int]()
|
||||||
val queue = Source(probe).runWith(Sink.queue(3))
|
val queue = Source(probe).runWith(Sink.queue())
|
||||||
val sub = probe.expectSubscription()
|
val sub = probe.expectSubscription()
|
||||||
|
|
||||||
queue.pull().pipeTo(testActor)
|
queue.pull().pipeTo(testActor)
|
||||||
|
|
@ -92,18 +95,7 @@ class AcknowledgeSinkSpec extends AkkaSpec {
|
||||||
sub.sendNext(1)
|
sub.sendNext(1)
|
||||||
expectMsg(Some(1))
|
expectMsg(Some(1))
|
||||||
sub.sendComplete()
|
sub.sendComplete()
|
||||||
}
|
queue.pull()
|
||||||
|
|
||||||
"work when buffer is 0" in assertAllStagesStopped {
|
|
||||||
val probe = TestPublisher.manualProbe[Int]()
|
|
||||||
val queue = Source(probe).runWith(Sink.queue(0))
|
|
||||||
val sub = probe.expectSubscription()
|
|
||||||
sub.sendNext(1)
|
|
||||||
|
|
||||||
queue.pull().pipeTo(testActor)
|
|
||||||
sub.sendNext(2)
|
|
||||||
expectMsg(Some(2))
|
|
||||||
sub.sendComplete()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -29,8 +29,8 @@ trait SinkQueue[T] {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method pulls elements from stream and returns future that:
|
* Method pulls elements from stream and returns future that:
|
||||||
* - fails if stream is finished
|
* - fails if stream is failed
|
||||||
* - completes with None in case if stream is completed after we got future
|
* - completes with None in case if stream is completed
|
||||||
* - completes with `Some(element)` in case next element is available from stream.
|
* - completes with `Some(element)` in case next element is available from stream.
|
||||||
*/
|
*/
|
||||||
def pull(): Future[Option[T]]
|
def pull(): Future[Option[T]]
|
||||||
|
|
|
||||||
|
|
@ -1,87 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package akka.stream.impl
|
|
||||||
|
|
||||||
import akka.actor.{ ActorLogging, ActorRef, Props, Status }
|
|
||||||
import akka.stream.actor.ActorPublisherMessage.Request
|
|
||||||
import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage, RequestStrategy }
|
|
||||||
|
|
||||||
import scala.util.{ Try, Failure, Success }
|
|
||||||
|
|
||||||
private[akka] object AcknowledgeSubscriber {
|
|
||||||
def props(highWatermark: Int) =
|
|
||||||
Props(new AcknowledgeSubscriber(highWatermark))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* INTERNAL API
|
|
||||||
*/
|
|
||||||
private[akka] class AcknowledgeSubscriber(maxBuffer: Int) extends ActorSubscriber with ActorLogging {
|
|
||||||
import ActorSubscriberMessage._
|
|
||||||
|
|
||||||
var buffer: Vector[Any] = Vector.empty
|
|
||||||
|
|
||||||
override val requestStrategy = new RequestStrategy {
|
|
||||||
def requestDemand(remainingRequested: Int): Int = {
|
|
||||||
maxBuffer - buffer.size - remainingRequested
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var requester: Option[ActorRef] = None
|
|
||||||
|
|
||||||
def receive = {
|
|
||||||
case Request(_) ⇒
|
|
||||||
if (requester.isEmpty) {
|
|
||||||
requester = Some(sender)
|
|
||||||
trySendElementDownstream()
|
|
||||||
} else
|
|
||||||
sender ! Status.Failure(
|
|
||||||
new IllegalStateException("You have to wait for first future to be resolved to send another request"))
|
|
||||||
|
|
||||||
case OnNext(elem) ⇒
|
|
||||||
if (maxBuffer != 0) {
|
|
||||||
buffer :+= elem
|
|
||||||
trySendElementDownstream()
|
|
||||||
} else requester match {
|
|
||||||
case Some(ref) ⇒
|
|
||||||
requester = None
|
|
||||||
ref ! Some(elem)
|
|
||||||
case None ⇒ log.debug("Dropping element because there is no downstream demand: [{}]", elem)
|
|
||||||
}
|
|
||||||
|
|
||||||
case OnError(cause) ⇒
|
|
||||||
trySendDownstream(Status.Failure(cause))
|
|
||||||
context.stop(self)
|
|
||||||
|
|
||||||
case OnComplete ⇒
|
|
||||||
if (buffer.isEmpty) {
|
|
||||||
trySendDownstream(Status.Success(None))
|
|
||||||
context.stop(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def trySendElementDownstream(): Unit = {
|
|
||||||
requester match {
|
|
||||||
case Some(ref) ⇒
|
|
||||||
if (buffer.size > 0) {
|
|
||||||
ref ! Some(buffer.head)
|
|
||||||
requester = None
|
|
||||||
buffer = buffer.tail
|
|
||||||
} else if (canceled) {
|
|
||||||
ref ! None
|
|
||||||
context.stop(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
case None ⇒ //do nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def trySendDownstream(e: Any): Unit = {
|
|
||||||
requester match {
|
|
||||||
case Some(ref) ⇒
|
|
||||||
ref ! e
|
|
||||||
case None ⇒ //do nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -3,17 +3,19 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.impl
|
package akka.stream.impl
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
import akka.actor.{ ActorRef, Props }
|
import akka.actor.{ ActorRef, Props }
|
||||||
import akka.stream.actor.ActorPublisherMessage.Request
|
import akka.stream.Attributes.InputBuffer
|
||||||
import akka.stream.impl.StreamLayout.Module
|
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, GraphStageLogic }
|
import akka.stream.impl.StreamLayout.Module
|
||||||
import akka.util.Timeout
|
import akka.stream.stage.{ AsyncCallback, GraphStageLogic, GraphStageWithMaterializedValue, InHandler }
|
||||||
import org.reactivestreams.{ Publisher, Subscriber }
|
import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
|
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
import scala.concurrent.duration.{ FiniteDuration, _ }
|
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
|
import scala.util.{ Failure, Success, Try }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -167,32 +169,6 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
|
||||||
override def toString: String = "ActorRefSink"
|
override def toString: String = "ActorRefSink"
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* INTERNAL API
|
|
||||||
*/
|
|
||||||
private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: Attributes,
|
|
||||||
shape: SinkShape[In], timeout: FiniteDuration) extends SinkModule[In, SinkQueue[In]](shape) {
|
|
||||||
|
|
||||||
override def create(context: MaterializationContext) = {
|
|
||||||
import akka.pattern.ask
|
|
||||||
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
|
|
||||||
|
|
||||||
implicit val t = Timeout(timeout)
|
|
||||||
val subscriberRef = actorMaterializer.actorOf(context,
|
|
||||||
AcknowledgeSubscriber.props(bufferSize))
|
|
||||||
(akka.stream.actor.ActorSubscriber[In](subscriberRef),
|
|
||||||
new SinkQueue[In] {
|
|
||||||
override def pull(): Future[Option[In]] = (subscriberRef ? Request(1)).mapTo[Option[In]]
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, SinkQueue[In]] =
|
|
||||||
new AcknowledgeSink[In](bufferSize, attributes, shape, timeout)
|
|
||||||
override def withAttributes(attr: Attributes): Module =
|
|
||||||
new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout)
|
|
||||||
override def toString: String = "AcknowledgeSink"
|
|
||||||
}
|
|
||||||
|
|
||||||
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
|
||||||
|
|
||||||
val in = Inlet[T]("lastOption.in")
|
val in = Inlet[T]("lastOption.in")
|
||||||
|
|
@ -257,3 +233,92 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV
|
||||||
}, p.future)
|
}, p.future)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueue[T]] {
|
||||||
|
trait RequestElementCallback[E] {
|
||||||
|
val requestElement = new AtomicReference[AnyRef](Nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Requested[E] = Promise[Option[T]]
|
||||||
|
|
||||||
|
val in = Inlet[T]("queueSink.in")
|
||||||
|
override val shape: SinkShape[T] = SinkShape.of(in)
|
||||||
|
|
||||||
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||||
|
type Received[E] = Try[Option[E]]
|
||||||
|
|
||||||
|
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
|
||||||
|
require(maxBuffer > 0, "Buffer size must be greater than 0")
|
||||||
|
|
||||||
|
val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1)
|
||||||
|
var currentRequest: Option[Requested[T]] = None
|
||||||
|
|
||||||
|
val stageLogic = new GraphStageLogic(shape) with RequestElementCallback[Requested[T]] {
|
||||||
|
override def keepGoingAfterAllPortsClosed = true
|
||||||
|
|
||||||
|
override def preStart(): Unit = {
|
||||||
|
val list = requestElement.getAndSet(callback.invoke _).asInstanceOf[List[Requested[T]]]
|
||||||
|
list.reverse.foreach(callback.invoke)
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val callback: AsyncCallback[Requested[T]] =
|
||||||
|
getAsyncCallback(promise ⇒ currentRequest match {
|
||||||
|
case Some(_) ⇒
|
||||||
|
promise.failure(new IllegalStateException("You have to wait for previous future to be resolved to send another request"))
|
||||||
|
case None ⇒
|
||||||
|
if (buffer.isEmpty) currentRequest = Some(promise)
|
||||||
|
else sendDownstream(promise)
|
||||||
|
})
|
||||||
|
|
||||||
|
def sendDownstream(promise: Requested[T]): Unit = {
|
||||||
|
val e = buffer.dequeue()
|
||||||
|
promise.complete(e)
|
||||||
|
e match {
|
||||||
|
case Success(_: Some[_]) ⇒ //do nothing
|
||||||
|
case Success(None) ⇒ completeStage()
|
||||||
|
case Failure(t) ⇒ failStage(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def enqueueAndNotify(requested: Received[T]): Unit = {
|
||||||
|
buffer.enqueue(requested)
|
||||||
|
currentRequest match {
|
||||||
|
case Some(p) ⇒
|
||||||
|
sendDownstream(p)
|
||||||
|
currentRequest = None
|
||||||
|
case None ⇒ //do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
enqueueAndNotify(Success(Some(grab(in))))
|
||||||
|
if (buffer.used < maxBuffer - 1) pull(in)
|
||||||
|
}
|
||||||
|
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
|
||||||
|
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
(stageLogic, new SinkQueue[T] {
|
||||||
|
override def pull(): Future[Option[T]] = {
|
||||||
|
val ref = stageLogic.requestElement
|
||||||
|
val p = Promise[Option[T]]
|
||||||
|
ref.get() match {
|
||||||
|
case l: List[_] ⇒
|
||||||
|
if (!ref.compareAndSet(l, p :: l))
|
||||||
|
ref.get() match {
|
||||||
|
case _: List[_] ⇒ throw new IllegalStateException("Concurrent call of SinkQueue.pull() is detected")
|
||||||
|
case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p)
|
||||||
|
}
|
||||||
|
case f: Function1[_, _] ⇒ f.asInstanceOf[Requested[T] ⇒ Unit](p)
|
||||||
|
}
|
||||||
|
p.future
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,7 @@ private[stream] object Stages {
|
||||||
val ignoreSink = name("ignoreSink")
|
val ignoreSink = name("ignoreSink")
|
||||||
val actorRefSink = name("actorRefSink")
|
val actorRefSink = name("actorRefSink")
|
||||||
val actorSubscriberSink = name("actorSubscriberSink")
|
val actorSubscriberSink = name("actorSubscriberSink")
|
||||||
val acknowledgeSink = name("acknowledgeSink")
|
val queueSink = name("queueSink")
|
||||||
val outputStreamSink = name("outputStreamSink") and IODispatcher
|
val outputStreamSink = name("outputStreamSink") and IODispatcher
|
||||||
val inputStreamSink = name("inputStreamSink") and IODispatcher
|
val inputStreamSink = name("inputStreamSink") and IODispatcher
|
||||||
val fileSink = name("fileSource") and IODispatcher
|
val fileSink = name("fileSource") and IODispatcher
|
||||||
|
|
|
||||||
|
|
@ -1154,7 +1154,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
||||||
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
|
def zipMat[T, M, M2](that: Graph[SourceShape[T], M],
|
||||||
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] =
|
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] =
|
||||||
this.viaMat(Flow.fromGraph(GraphDSL.create(that,
|
this.viaMat(Flow.fromGraph(GraphDSL.create(that,
|
||||||
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] {
|
new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] {
|
||||||
def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = {
|
def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = {
|
||||||
val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T])
|
val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T])
|
||||||
b.from(s).toInlet(zip.in1)
|
b.from(s).toInlet(zip.in1)
|
||||||
|
|
|
||||||
|
|
@ -203,14 +203,20 @@ object Sink {
|
||||||
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
|
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
|
||||||
* `Future` completes when element is available.
|
* `Future` completes when element is available.
|
||||||
*
|
*
|
||||||
* `Sink` will request at most `bufferSize` number of elements from
|
* Before calling pull method second time you need to wait until previous Future completes.
|
||||||
* upstream and then stop back pressure.
|
* Pull returns Failed future with ''IllegalStateException'' if previous future has not yet completed.
|
||||||
*
|
*
|
||||||
* @param bufferSize The size of the buffer in element count
|
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
|
||||||
* @param timeout Timeout for ``SinkQueue.pull():Future[Option[T] ]``
|
* upstream and then stop back pressure. You can configure size of input
|
||||||
|
* buffer by using [[Sink.withAttributes]] method.
|
||||||
|
*
|
||||||
|
* For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None
|
||||||
|
* as completion marker
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.SinkQueue]]
|
||||||
*/
|
*/
|
||||||
def queue[T](bufferSize: Int, timeout: FiniteDuration): Sink[T, SinkQueue[T]] =
|
def queue[T](): Sink[T, SinkQueue[T]] =
|
||||||
new Sink(scaladsl.Sink.queue(bufferSize, timeout))
|
new Sink(scaladsl.Sink.queue())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
|
* Creates a Sink that writes incoming [[ByteString]] elements to the given file.
|
||||||
|
|
|
||||||
|
|
@ -270,16 +270,20 @@ object Sink {
|
||||||
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
|
* [[akka.stream.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
|
||||||
* `Future` completes when element is available.
|
* `Future` completes when element is available.
|
||||||
*
|
*
|
||||||
* `Sink` will request at most `bufferSize` number of elements from
|
* Before calling pull method second time you need to wait until previous Future completes.
|
||||||
* upstream and then stop back pressure.
|
* Pull returns Failed future with ''IllegalStateException'' if previous future has not yet completed.
|
||||||
*
|
*
|
||||||
* @param bufferSize The size of the buffer in element count
|
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
|
||||||
* @param timeout Timeout for ``SinkQueue.pull():Future[Option[T]]``
|
* upstream and then stop back pressure. You can configure size of input
|
||||||
|
* buffer by using [[Sink.withAttributes]] method.
|
||||||
|
*
|
||||||
|
* For stream completion you need to pull all elements from [[akka.stream.SinkQueue]] including last None
|
||||||
|
* as completion marker
|
||||||
|
*
|
||||||
|
* @see [[akka.stream.SinkQueue]]
|
||||||
*/
|
*/
|
||||||
def queue[T](bufferSize: Int, timeout: FiniteDuration = 5.seconds): Sink[T, SinkQueue[T]] = {
|
def queue[T](): Sink[T, SinkQueue[T]] =
|
||||||
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
|
Sink.fromGraph(new QueueSink().withAttributes(DefaultAttributes.queueSink))
|
||||||
new Sink(new AcknowledgeSink(bufferSize, DefaultAttributes.acknowledgeSink, shape("AcknowledgeSink"), timeout))
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites
|
* Creates a Sink which writes incoming [[ByteString]] elements to the given file and either overwrites
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue