Merge pull request #25286 from giftig/issue-25285

Fix Source.actorRef not  completing for Status.Success("ok")
This commit is contained in:
Patrik Nordwall 2018-07-10 13:23:23 +02:00 committed by GitHub
commit 62aaae06ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 49 additions and 53 deletions

View file

@ -1,6 +1,6 @@
# actorRef # actorRef
Materialize an `ActorRef`, sending messages to it will emit them on the stream. Materialize an `ActorRef`; sending messages to it will emit them on the stream.
@ref[Source operators](../index.md#source-operators) @ref[Source operators](../index.md#source-operators)
@ -12,15 +12,15 @@ Materialize an `ActorRef`, sending messages to it will emit them on the stream.
## Description ## Description
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contains
a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping
elements or failing the stream, the strategy is chosen by the user. elements or failing the stream; the strategy is chosen by the user.
@@@div { .callout } @@@div { .callout }
**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref **emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef`
**completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill` **completes** when the `ActorRef` is sent `akka.actor.Status.Success`
@@@ @@@

View file

@ -7,7 +7,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
| |Operator|Description| | |Operator|Description|
|--|--|--| |--|--|--|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`, sending messages to it will emit them on the stream. | |Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.| |Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.|

View file

@ -12,8 +12,8 @@ To use Akka Streams, add the module to your project:
## Integrating with Actors ## Integrating with Actors
For piping the elements of a stream as messages to an ordinary actor you can use For piping the elements of a stream as messages to an ordinary actor you can use
`ask` in a `mapAsync` or use `Sink.actorRefWithAck`. `ask` in a `mapAsync` or use `Sink.actorRefWithAck`.
Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is
materialized by `Source.actorRef`. materialized by `Source.actorRef`.
@ -24,9 +24,9 @@ materialized by `Source.actorRef`.
See also: @ref[Flow.ask operator reference docs](operators/Source-or-Flow/ask.md), @ref[ActorFlow.ask operator reference docs](operators/ActorFlow/ask.md) for Akka Typed See also: @ref[Flow.ask operator reference docs](operators/Source-or-Flow/ask.md), @ref[ActorFlow.ask operator reference docs](operators/ActorFlow/ask.md) for Akka Typed
@@@ @@@
A nice way to delegate some processing of elements in a stream to an actor is to use `ask`. A nice way to delegate some processing of elements in a stream to an actor is to use `ask`.
The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of
the `ask` and the mailbox of the actor will not be filled with more messages than the given the `ask` and the mailbox of the actor will not be filled with more messages than the given
`parallelism` of the `ask` operator (similarly to how the `mapAsync` operator works). `parallelism` of the `ask` operator (similarly to how the `mapAsync` operator works).
Scala Scala
@ -40,7 +40,7 @@ the stream elements, i.e. the `parallelism` does not change the ordering
of the messages. There is a performance advantage of using parallelism > 1 of the messages. There is a performance advantage of using parallelism > 1
even though the actor will only process one message at a time because then there even though the actor will only process one message at a time because then there
is already a message in the mailbox when the actor has completed previous is already a message in the mailbox when the actor has completed previous
message. message.
The actor must reply to the @scala[`sender()`]@java[`getSender()`] for each message from the stream. That The actor must reply to the @scala[`sender()`]@java[`getSender()`] for each message from the stream. That
reply will complete the @scala[`Future`]@java[`CompletionStage`] of the `ask` and it will be the element that is emitted downstreams. reply will complete the @scala[`Future`]@java[`CompletionStage`] of the `ask` and it will be the element that is emitted downstreams.
@ -56,16 +56,16 @@ Java
The stream can be completed with failure by sending `akka.actor.Status.Failure` as reply from the actor. The stream can be completed with failure by sending `akka.actor.Status.Failure` as reply from the actor.
If the `ask` fails due to timeout the stream will be completed with If the `ask` fails due to timeout the stream will be completed with
`TimeoutException` failure. If that is not desired outcome you can use `recover` `TimeoutException` failure. If that is not desired outcome you can use `recover`
on the `ask` @scala[`Future`]@java[`CompletionStage`], or use the other "restart" operators to restart it. on the `ask` @scala[`Future`]@java[`CompletionStage`], or use the other "restart" operators to restart it.
If you don't care about the reply values and only use them as back-pressure signals you If you don't care about the reply values and only use them as back-pressure signals you
can use `Sink.ignore` after the `ask` operator and then actor is effectively a sink can use `Sink.ignore` after the `ask` operator and then actor is effectively a sink
of the stream. of the stream.
Note that while you may implement the same concept using `mapAsync`, that style would not be aware of the actor terminating. Note that while you may implement the same concept using `mapAsync`, that style would not be aware of the actor terminating.
If you are intending to ask multiple actors by using @ref:[Actor routers](../routing.md), then If you are intending to ask multiple actors by using @ref:[Actor routers](../routing.md), then
you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important, you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important,
since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator. since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator.
@ -80,8 +80,8 @@ First element is always *onInitMessage*, then stream is waiting for the given ac
from the given actor which means that it is ready to process elements. It also requires the given acknowledgement from the given actor which means that it is ready to process elements. It also requires the given acknowledgement
message after each stream element to make back-pressure work. message after each stream element to make back-pressure work.
If the target actor terminates the stream will be cancelled. When the stream is completed successfully the If the target actor terminates the stream will be cancelled. When the stream is completed successfully the
given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with
failure a `akka.actor.Status.Failure` message will be sent to the destination actor. failure a `akka.actor.Status.Failure` message will be sent to the destination actor.
Scala Scala
@ -100,9 +100,9 @@ Java
Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted
as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements. as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements.
Handling the other signals while is not required, however is a good practice, to see the state of the streams lifecycle Handling the other signals while is not required, however is a good practice, to see the state of the streams lifecycle
in the connected actor as well. Technically it is also possible to use multiple sinks targeting the same actor, in the connected actor as well. Technically it is also possible to use multiple sinks targeting the same actor,
however it is not common practice to do so, and one should rather investigate using a `Merge` operator for this purpose. however it is not common practice to do so, and one should rather investigate using a `Merge` operator for this purpose.
@@@ note @@@ note
@ -121,11 +121,11 @@ use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though.
The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation. The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation.
`Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside `Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside
the stream). The elements will be buffered until the stream can process them. You can `offer` elements to the stream). The elements will be buffered until the stream can process them. You can `offer` elements to
the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will
be buffered until request for demand is received. be buffered until request for demand is received.
Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the
buffer is full, instead the returned @scala[`Future`]@java[`CompletionStage`] does not complete until there is space in the buffer is full, instead the returned @scala[`Future`]@java[`CompletionStage`] does not complete until there is space in the
buffer and `offer` should not be called again until it completes. buffer and `offer` should not be called again until it completes.
@ -136,8 +136,8 @@ will be discarded if downstream is terminated.
You could combine it with the @ref[`throttle`](operators/Source-or-Flow/throttle.md) operator is used to slow down the stream to `5 element` per `3 seconds` and other patterns. You could combine it with the @ref[`throttle`](operators/Source-or-Flow/throttle.md) operator is used to slow down the stream to `5 element` per `3 seconds` and other patterns.
`SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage<QueueOfferResult>`] which completes with `QueueOfferResult.Enqueued` `SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage<QueueOfferResult>`] which completes with `QueueOfferResult.Enqueued`
if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element
was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or
`QueueOfferResult.QueueClosed` when downstream is completed. `QueueOfferResult.QueueClosed` when downstream is completed.
Scala Scala
@ -157,12 +157,11 @@ demand is received.
Depending on the defined `OverflowStrategy` it might drop elements if there is no space Depending on the defined `OverflowStrategy` it might drop elements if there is no space
available in the buffer. The strategy `OverflowStrategy.backpressure` is not supported available in the buffer. The strategy `OverflowStrategy.backpressure` is not supported
for this Source type, i.e. elements will be dropped if the buffer is filled by sending for this Source type, i.e. elements will be dropped if the buffer is filled by sending
at a rate that is faster than the stream can consume. You should consider using `Source.queue` at a rate that is faster than the stream can consume. You should consider using `Source.queue`
if you want a backpressured actor interface. if you want a backpressured actor interface.
The stream can be completed successfully by sending `akka.actor.PoisonPill` or The stream can be completed successfully by sending `akka.actor.Status.Success` to the actor reference.
`akka.actor.Status.Success` to the actor reference.
The stream can be completed with failure by sending `akka.actor.Status.Failure` to the The stream can be completed with failure by sending `akka.actor.Status.Failure` to the
actor reference. actor reference.
@ -549,7 +548,7 @@ Please note that a factory is necessary to achieve reusability of the resulting
### Implementing Reactive Streams Publisher or Subscriber ### Implementing Reactive Streams Publisher or Subscriber
As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and
any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you
implement Reactive Streams integrations with built-in operators or @ref:[custom operators](stream-customize.md). implement Reactive Streams integrations with built-in operators or @ref:[custom operators](stream-customize.md).
For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are

View file

@ -12,6 +12,7 @@ import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.Status import akka.actor.Status
import akka.Done
class ActorRefSourceSpec extends StreamSpec { class ActorRefSourceSpec extends StreamSpec {
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
@ -79,14 +80,6 @@ class ActorRefSourceSpec extends StreamSpec {
expectTerminated(ref) expectTerminated(ref)
} }
"complete the stream immediatly when receiving PoisonPill" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
ref ! PoisonPill
s.expectComplete()
}
"signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped { "signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
@ -129,18 +122,12 @@ class ActorRefSourceSpec extends StreamSpec {
s.expectComplete() s.expectComplete()
} }
"after receiving Status.Success, allow for earlier completion with PoisonPill" in assertAllStagesStopped { "complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]() val (ref, done) = {
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() Source.actorRef(3, OverflowStrategy.dropBuffer).toMat(Sink.ignore)(Keep.both).run()
val sub = s.expectSubscription }
ref ! 1
ref ! 2
ref ! 3
ref ! Status.Success("ok") ref ! Status.Success("ok")
sub.request(2) // not all elements drained yet done.futureValue should be(Done)
s.expectNext(1, 2)
ref ! PoisonPill
s.expectComplete() // element `3` not signaled
} }
"fail the stream when receiving Status.Failure" in assertAllStagesStopped { "fail the stream when receiving Status.Failure" in assertAllStagesStopped {

View file

@ -46,7 +46,7 @@ import akka.stream.ActorMaterializerSettings
.orElse(receiveElem) .orElse(receiveElem)
def receiveComplete: Receive = completionMatcher.andThen { _ def receiveComplete: Receive = completionMatcher.andThen { _
if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully if (bufferSize == 0 || buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully
else context.become(drainBufferThenComplete) else context.become(drainBufferThenComplete)
} }
@ -110,7 +110,7 @@ import akka.stream.ActorMaterializerSettings
while (totalDemand > 0L && !buffer.isEmpty) while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue()) onNext(buffer.dequeue())
if (buffer.isEmpty) context.stop(self) // will complete the stream successfully if (buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully
case elem if isActive case elem if isActive
log.debug("Dropping element because Status.Success received already, " + log.debug("Dropping element because Status.Success received already, " +

View file

@ -306,13 +306,18 @@ object Source {
* *
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]] * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]
* (whose content will be ignored) in which case already buffered elements will be signaled before signaling * (whose content will be ignored) in which case already buffered elements will be signaled before signaling
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. * completion.
* *
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
* actor reference. In case the Actor is still draining its internal buffer (after having received * actor reference. In case the Actor is still draining its internal buffer (after having received
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
* the failure will be signaled downstream immediately (instead of the completion signal). * the failure will be signaled downstream immediately (instead of the completion signal).
* *
* Note that terminating the actor without first completing it, either with a success or a
* failure, will prevent the actor triggering downstream completion and the stream will continue
* to run even though the source actor is dead. Therefore you should **not** attempt to
* manually terminate the actor such as with a [[akka.actor.PoisonPill]].
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream, * The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens. * i.e. you can watch it to get notified when that happens.
* *

View file

@ -470,13 +470,18 @@ object Source {
* *
* The stream can be completed successfully by sending the actor reference a message that is matched by * The stream can be completed successfully by sending the actor reference a message that is matched by
* `completionMatcher` in which case already buffered elements will be signaled before signaling * `completionMatcher` in which case already buffered elements will be signaled before signaling
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. * completion.
* *
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
* the failure will be signaled downstream immediately (instead of the completion signal). * the failure will be signaled downstream immediately (instead of the completion signal).
* *
* Note that terminating the actor without first completing it, either with a success or a
* failure, will prevent the actor triggering downstream completion and the stream will continue
* to run even though the source actor is dead. Therefore you should **not** attempt to
* manually terminate the actor such as with a [[akka.actor.PoisonPill]].
*
* The actor will be stopped when the stream is completed, failed or canceled from downstream, * The actor will be stopped when the stream is completed, failed or canceled from downstream,
* i.e. you can watch it to get notified when that happens. * i.e. you can watch it to get notified when that happens.
* *