Merge pull request #19489 from agolubev/agolubev-#18890_polish-Source.queue

Agolubev #18890 polish source.queue
This commit is contained in:
Roland Kuhn 2016-01-22 08:39:53 +01:00
commit ba14bb4f3f
19 changed files with 565 additions and 332 deletions

View file

@ -372,7 +372,7 @@ object Source {
*/
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported")
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
}
@ -398,29 +398,33 @@ object Source {
/**
* Creates a `Source` that is materialized as an [[akka.stream.SourceQueue]].
* You can push elements to the queue and they will be emitted to the stream if there is demand from downstream,
* otherwise they will be buffered until request for demand is received.
* otherwise they will be buffered until request for demand is received. Elements in the buffer will be discarded
* if downstream is terminated.
*
* Depending on the defined [[akka.stream.OverflowStrategy]] it might drop elements if
* there is no space available in the buffer.
*
* Acknowledgement mechanism is available.
* [[akka.stream.SourceQueue.offer]] returns ``Future[Boolean]`` which completes with true
* if element was added to buffer or sent downstream. It completes
* with false if element was dropped.
* [[akka.stream.SourceQueue.offer]] returns ``Future[StreamCallbackStatus[Boolean]]`` which completes with `Success(true)`
* if element was added to buffer or sent downstream. It completes with `Success(false)` if element was dropped. Can also complete
* with [[akka.stream.StreamCallbackStatus.Failure]] - when stream failed or [[akka.stream.StreamCallbackStatus.StreamCompleted]]
* when downstream is completed.
*
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete `offer():Future` until buffer is full.
* The strategy [[akka.stream.OverflowStrategy.backpressure]] will not complete last `offer():Future`
* call when buffer is full.
*
* The buffer can be disabled by using `bufferSize` of 0 and then received messages are dropped
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
* You can watch accessibility of stream with [[akka.stream.SourceQueue.watchCompletion]].
* It returns future that completes with success when stream is completed or fail when stream is failed.
*
* @param bufferSize The size of the buffer in element count
* The buffer can be disabled by using `bufferSize` of 0 and then received message will wait for downstream demand.
* When `bufferSize` is 0 the `overflowStrategy` does not matter.
*
* SourceQueue that current source is materialized to is for single thread usage only.
*
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param timeout Timeout for ``SourceQueue.offer(T):Future[Boolean]``
*/
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy, timeout: FiniteDuration = 5.seconds): Source[T, SourceQueue[T]] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
new Source(new AcknowledgeSource(bufferSize, overflowStrategy, DefaultAttributes.acknowledgeSource, shape("AcknowledgeSource")))
}
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueue[T]] =
Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource))
}