Source.actorRef not completing on Success (#25285)
When a Success is received, call onCompleteThenStop instead of just context.stop; that takes care of the completion logic instead of just stopping the actor and leaving the stream going. Add test to ensure the stream materializes on Source.actorRef receiving Status.Success Remove tests around stream completion behaviour in response to PoisonPill - as well as these tests not correctly demonstrating that the completion was passed on downstream, they describe behaviour which was previously incidental and is no longer accurate. Update the docs to reflect that PoisonPill should not be used on the actor ref as this scenario will necessarily result in bad behaviour as it will be unable to signal the completion downstream. Make a few grammar fixes and remove some trailing space while updating the docs.
This commit is contained in:
parent
02f6899952
commit
ce185c4dfc
7 changed files with 49 additions and 53 deletions
|
|
@ -1,6 +1,6 @@
|
|||
# actorRef
|
||||
|
||||
Materialize an `ActorRef`, sending messages to it will emit them on the stream.
|
||||
Materialize an `ActorRef`; sending messages to it will emit them on the stream.
|
||||
|
||||
@ref[Source operators](../index.md#source-operators)
|
||||
|
||||
|
|
@ -12,15 +12,15 @@ Materialize an `ActorRef`, sending messages to it will emit them on the stream.
|
|||
|
||||
## Description
|
||||
|
||||
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain
|
||||
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contains
|
||||
a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping
|
||||
elements or failing the stream, the strategy is chosen by the user.
|
||||
elements or failing the stream; the strategy is chosen by the user.
|
||||
|
||||
@@@div { .callout }
|
||||
|
||||
**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref
|
||||
**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef`
|
||||
|
||||
**completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill`
|
||||
**completes** when the `ActorRef` is sent `akka.actor.Status.Success`
|
||||
|
||||
@@@
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|
|||
|
||||
| |Operator|Description|
|
||||
|--|--|--|
|
||||
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`, sending messages to it will emit them on the stream. |
|
||||
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|
||||
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|
||||
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|
||||
|Source|<a name="cycle"></a>@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.|
|
||||
|
|
|
|||
|
|
@ -161,8 +161,7 @@ for this Source type, i.e. elements will be dropped if the buffer is filled by s
|
|||
at a rate that is faster than the stream can consume. You should consider using `Source.queue`
|
||||
if you want a backpressured actor interface.
|
||||
|
||||
The stream can be completed successfully by sending `akka.actor.PoisonPill` or
|
||||
`akka.actor.Status.Success` to the actor reference.
|
||||
The stream can be completed successfully by sending `akka.actor.Status.Success` to the actor reference.
|
||||
|
||||
The stream can be completed with failure by sending `akka.actor.Status.Failure` to the
|
||||
actor reference.
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.stream.testkit.Utils._
|
|||
import akka.stream.testkit.scaladsl.StreamTestKit._
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.Status
|
||||
import akka.Done
|
||||
|
||||
class ActorRefSourceSpec extends StreamSpec {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
|
@ -79,14 +80,6 @@ class ActorRefSourceSpec extends StreamSpec {
|
|||
expectTerminated(ref)
|
||||
}
|
||||
|
||||
"complete the stream immediatly when receiving PoisonPill" in assertAllStagesStopped {
|
||||
val s = TestSubscriber.manualProbe[Int]()
|
||||
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
||||
val sub = s.expectSubscription
|
||||
ref ! PoisonPill
|
||||
s.expectComplete()
|
||||
}
|
||||
|
||||
"signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped {
|
||||
val s = TestSubscriber.manualProbe[Int]()
|
||||
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
||||
|
|
@ -129,18 +122,12 @@ class ActorRefSourceSpec extends StreamSpec {
|
|||
s.expectComplete()
|
||||
}
|
||||
|
||||
"after receiving Status.Success, allow for earlier completion with PoisonPill" in assertAllStagesStopped {
|
||||
val s = TestSubscriber.manualProbe[Int]()
|
||||
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run()
|
||||
val sub = s.expectSubscription
|
||||
ref ! 1
|
||||
ref ! 2
|
||||
ref ! 3
|
||||
"complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped {
|
||||
val (ref, done) = {
|
||||
Source.actorRef(3, OverflowStrategy.dropBuffer).toMat(Sink.ignore)(Keep.both).run()
|
||||
}
|
||||
ref ! Status.Success("ok")
|
||||
sub.request(2) // not all elements drained yet
|
||||
s.expectNext(1, 2)
|
||||
ref ! PoisonPill
|
||||
s.expectComplete() // element `3` not signaled
|
||||
done.futureValue should be(Done)
|
||||
}
|
||||
|
||||
"fail the stream when receiving Status.Failure" in assertAllStagesStopped {
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ import akka.stream.ActorMaterializerSettings
|
|||
.orElse(receiveElem)
|
||||
|
||||
def receiveComplete: Receive = completionMatcher.andThen { _ ⇒
|
||||
if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully
|
||||
if (bufferSize == 0 || buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully
|
||||
else context.become(drainBufferThenComplete)
|
||||
}
|
||||
|
||||
|
|
@ -110,7 +110,7 @@ import akka.stream.ActorMaterializerSettings
|
|||
while (totalDemand > 0L && !buffer.isEmpty)
|
||||
onNext(buffer.dequeue())
|
||||
|
||||
if (buffer.isEmpty) context.stop(self) // will complete the stream successfully
|
||||
if (buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully
|
||||
|
||||
case elem if isActive ⇒
|
||||
log.debug("Dropping element because Status.Success received already, " +
|
||||
|
|
|
|||
|
|
@ -306,13 +306,18 @@ object Source {
|
|||
*
|
||||
* The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]]
|
||||
* (whose content will be ignored) in which case already buffered elements will be signaled before signaling
|
||||
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
|
||||
* completion.
|
||||
*
|
||||
* The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the
|
||||
* actor reference. In case the Actor is still draining its internal buffer (after having received
|
||||
* a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]],
|
||||
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||
*
|
||||
* Note that terminating the actor without first completing it, either with a success or a
|
||||
* failure, will prevent the actor triggering downstream completion and the stream will continue
|
||||
* to run even though the source actor is dead. Therefore you should **not** attempt to
|
||||
* manually terminate the actor such as with a [[akka.actor.PoisonPill]].
|
||||
*
|
||||
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||
* i.e. you can watch it to get notified when that happens.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -470,13 +470,18 @@ object Source {
|
|||
*
|
||||
* The stream can be completed successfully by sending the actor reference a message that is matched by
|
||||
* `completionMatcher` in which case already buffered elements will be signaled before signaling
|
||||
* completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately.
|
||||
* completion.
|
||||
*
|
||||
* The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted
|
||||
* [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received
|
||||
* a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`,
|
||||
* the failure will be signaled downstream immediately (instead of the completion signal).
|
||||
*
|
||||
* Note that terminating the actor without first completing it, either with a success or a
|
||||
* failure, will prevent the actor triggering downstream completion and the stream will continue
|
||||
* to run even though the source actor is dead. Therefore you should **not** attempt to
|
||||
* manually terminate the actor such as with a [[akka.actor.PoisonPill]].
|
||||
*
|
||||
* The actor will be stopped when the stream is completed, failed or canceled from downstream,
|
||||
* i.e. you can watch it to get notified when that happens.
|
||||
*
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue