diff --git a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md index a3333a73e6..b297327c76 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/actorRef.md @@ -1,6 +1,6 @@ # actorRef -Materialize an `ActorRef`, sending messages to it will emit them on the stream. +Materialize an `ActorRef`; sending messages to it will emit them on the stream. @ref[Source operators](../index.md#source-operators) @@ -12,15 +12,15 @@ Materialize an `ActorRef`, sending messages to it will emit them on the stream. ## Description -Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain +Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contains a buffer but since communication is one way, there is no back pressure. Handling overflow is done by either dropping -elements or failing the stream, the strategy is chosen by the user. +elements or failing the stream; the strategy is chosen by the user. @@@div { .callout } -**emits** when there is demand and there are messages in the buffer or a message is sent to the actorref +**emits** when there is demand and there are messages in the buffer or a message is sent to the `ActorRef` -**completes** when the `ActorRef` is sent `akka.actor.Status.Success` or `PoisonPill` +**completes** when the `ActorRef` is sent `akka.actor.Status.Success` @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index 4783e38516..61befc4006 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -7,7 +7,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] | |Operator|Description| |--|--|--| -|Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`, sending messages to it will emit them on the stream. | +|Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| |Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|@ref[cycle](Source/cycle.md)|Stream iterator in cycled manner.| diff --git a/akka-docs/src/main/paradox/stream/stream-integrations.md b/akka-docs/src/main/paradox/stream/stream-integrations.md index b626cc1970..cff5f02815 100644 --- a/akka-docs/src/main/paradox/stream/stream-integrations.md +++ b/akka-docs/src/main/paradox/stream/stream-integrations.md @@ -12,8 +12,8 @@ To use Akka Streams, add the module to your project: ## Integrating with Actors -For piping the elements of a stream as messages to an ordinary actor you can use -`ask` in a `mapAsync` or use `Sink.actorRefWithAck`. +For piping the elements of a stream as messages to an ordinary actor you can use +`ask` in a `mapAsync` or use `Sink.actorRefWithAck`. Messages can be sent to a stream with `Source.queue` or via the `ActorRef` that is materialized by `Source.actorRef`. @@ -24,9 +24,9 @@ materialized by `Source.actorRef`. See also: @ref[Flow.ask operator reference docs](operators/Source-or-Flow/ask.md), @ref[ActorFlow.ask operator reference docs](operators/ActorFlow/ask.md) for Akka Typed @@@ -A nice way to delegate some processing of elements in a stream to an actor is to use `ask`. -The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of -the `ask` and the mailbox of the actor will not be filled with more messages than the given +A nice way to delegate some processing of elements in a stream to an actor is to use `ask`. +The back-pressure of the stream is maintained by the @scala[`Future`]@java[`CompletionStage`] of +the `ask` and the mailbox of the actor will not be filled with more messages than the given `parallelism` of the `ask` operator (similarly to how the `mapAsync` operator works). Scala @@ -40,7 +40,7 @@ the stream elements, i.e. the `parallelism` does not change the ordering of the messages. There is a performance advantage of using parallelism > 1 even though the actor will only process one message at a time because then there is already a message in the mailbox when the actor has completed previous -message. +message. The actor must reply to the @scala[`sender()`]@java[`getSender()`] for each message from the stream. That reply will complete the @scala[`Future`]@java[`CompletionStage`] of the `ask` and it will be the element that is emitted downstreams. @@ -56,16 +56,16 @@ Java The stream can be completed with failure by sending `akka.actor.Status.Failure` as reply from the actor. If the `ask` fails due to timeout the stream will be completed with -`TimeoutException` failure. If that is not desired outcome you can use `recover` +`TimeoutException` failure. If that is not desired outcome you can use `recover` on the `ask` @scala[`Future`]@java[`CompletionStage`], or use the other "restart" operators to restart it. -If you don't care about the reply values and only use them as back-pressure signals you +If you don't care about the reply values and only use them as back-pressure signals you can use `Sink.ignore` after the `ask` operator and then actor is effectively a sink of the stream. Note that while you may implement the same concept using `mapAsync`, that style would not be aware of the actor terminating. - -If you are intending to ask multiple actors by using @ref:[Actor routers](../routing.md), then + +If you are intending to ask multiple actors by using @ref:[Actor routers](../routing.md), then you should use `mapAsyncUnordered` and perform the ask manually in there, as the ordering of the replies is not important, since multiple actors are being asked concurrently to begin with, and no single actor is the one to be watched by the operator. @@ -80,8 +80,8 @@ First element is always *onInitMessage*, then stream is waiting for the given ac from the given actor which means that it is ready to process elements. It also requires the given acknowledgement message after each stream element to make back-pressure work. -If the target actor terminates the stream will be cancelled. When the stream is completed successfully the -given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with +If the target actor terminates the stream will be cancelled. When the stream is completed successfully the +given `onCompleteMessage` will be sent to the destination actor. When the stream is completed with failure a `akka.actor.Status.Failure` message will be sent to the destination actor. Scala @@ -100,9 +100,9 @@ Java Note that replying to the sender of the elements (the "stream") is required as lack of those ack signals would be interpreted as back-pressure (as intended), and no new elements will be sent into the actor until it acknowledges some elements. -Handling the other signals while is not required, however is a good practice, to see the state of the streams lifecycle +Handling the other signals while is not required, however is a good practice, to see the state of the streams lifecycle in the connected actor as well. Technically it is also possible to use multiple sinks targeting the same actor, -however it is not common practice to do so, and one should rather investigate using a `Merge` operator for this purpose. +however it is not common practice to do so, and one should rather investigate using a `Merge` operator for this purpose. @@@ note @@ -121,11 +121,11 @@ use `Sink.actorRefWithAck` or `ask` in `mapAsync`, though. The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation. `Source.queue` can be used for emitting elements to a stream from an actor (or from anything running outside -the stream). The elements will be buffered until the stream can process them. You can `offer` elements to -the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will +the stream). The elements will be buffered until the stream can process them. You can `offer` elements to +the queue and they will be emitted to the stream if there is demand from downstream, otherwise they will be buffered until request for demand is received. -Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the +Use overflow strategy `akka.stream.OverflowStrategy.backpressure` to avoid dropping of elements if the buffer is full, instead the returned @scala[`Future`]@java[`CompletionStage`] does not complete until there is space in the buffer and `offer` should not be called again until it completes. @@ -136,8 +136,8 @@ will be discarded if downstream is terminated. You could combine it with the @ref[`throttle`](operators/Source-or-Flow/throttle.md) operator is used to slow down the stream to `5 element` per `3 seconds` and other patterns. `SourceQueue.offer` returns @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`] which completes with `QueueOfferResult.Enqueued` -if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element -was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or +if element was added to buffer or sent downstream. It completes with `QueueOfferResult.Dropped` if element +was dropped. Can also complete with `QueueOfferResult.Failure` - when stream failed or `QueueOfferResult.QueueClosed` when downstream is completed. Scala @@ -157,12 +157,11 @@ demand is received. Depending on the defined `OverflowStrategy` it might drop elements if there is no space available in the buffer. The strategy `OverflowStrategy.backpressure` is not supported -for this Source type, i.e. elements will be dropped if the buffer is filled by sending -at a rate that is faster than the stream can consume. You should consider using `Source.queue` +for this Source type, i.e. elements will be dropped if the buffer is filled by sending +at a rate that is faster than the stream can consume. You should consider using `Source.queue` if you want a backpressured actor interface. -The stream can be completed successfully by sending `akka.actor.PoisonPill` or -`akka.actor.Status.Success` to the actor reference. +The stream can be completed successfully by sending `akka.actor.Status.Success` to the actor reference. The stream can be completed with failure by sending `akka.actor.Status.Failure` to the actor reference. @@ -549,7 +548,7 @@ Please note that a factory is necessary to achieve reusability of the resulting ### Implementing Reactive Streams Publisher or Subscriber As described above any Akka Streams `Source` can be exposed as a Reactive Streams `Publisher` and -any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you +any `Sink` can be exposed as a Reactive Streams `Subscriber`. Therefore we recommend that you implement Reactive Streams integrations with built-in operators or @ref:[custom operators](stream-customize.md). For historical reasons the `ActorPublisher` and `ActorSubscriber` traits are diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index a2a6b356c1..3f5583b2b4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -12,6 +12,7 @@ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.actor.PoisonPill import akka.actor.Status +import akka.Done class ActorRefSourceSpec extends StreamSpec { implicit val materializer = ActorMaterializer() @@ -79,14 +80,6 @@ class ActorRefSourceSpec extends StreamSpec { expectTerminated(ref) } - "complete the stream immediatly when receiving PoisonPill" in assertAllStagesStopped { - val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - ref ! PoisonPill - s.expectComplete() - } - "signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() @@ -129,18 +122,12 @@ class ActorRefSourceSpec extends StreamSpec { s.expectComplete() } - "after receiving Status.Success, allow for earlier completion with PoisonPill" in assertAllStagesStopped { - val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() - val sub = s.expectSubscription - ref ! 1 - ref ! 2 - ref ! 3 + "complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped { + val (ref, done) = { + Source.actorRef(3, OverflowStrategy.dropBuffer).toMat(Sink.ignore)(Keep.both).run() + } ref ! Status.Success("ok") - sub.request(2) // not all elements drained yet - s.expectNext(1, 2) - ref ! PoisonPill - s.expectComplete() // element `3` not signaled + done.futureValue should be(Done) } "fail the stream when receiving Status.Failure" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 8a0ce79cfc..d1e7d98445 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -46,7 +46,7 @@ import akka.stream.ActorMaterializerSettings .orElse(receiveElem) def receiveComplete: Receive = completionMatcher.andThen { _ ⇒ - if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully + if (bufferSize == 0 || buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully else context.become(drainBufferThenComplete) } @@ -110,7 +110,7 @@ import akka.stream.ActorMaterializerSettings while (totalDemand > 0L && !buffer.isEmpty) onNext(buffer.dequeue()) - if (buffer.isEmpty) context.stop(self) // will complete the stream successfully + if (buffer.isEmpty) onCompleteThenStop() // will complete the stream successfully case elem if isActive ⇒ log.debug("Dropping element because Status.Success received already, " + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2b0c94ea52..65354f419f 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -306,13 +306,18 @@ object Source { * * The stream can be completed successfully by sending the actor reference a [[akka.actor.Status.Success]] * (whose content will be ignored) in which case already buffered elements will be signaled before signaling - * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * completion. * * The stream can be completed with failure by sending a [[akka.actor.Status.Failure]] to the * actor reference. In case the Actor is still draining its internal buffer (after having received * a [[akka.actor.Status.Success]]) before signaling completion and it receives a [[akka.actor.Status.Failure]], * the failure will be signaled downstream immediately (instead of the completion signal). * + * Note that terminating the actor without first completing it, either with a success or a + * failure, will prevent the actor triggering downstream completion and the stream will continue + * to run even though the source actor is dead. Therefore you should **not** attempt to + * manually terminate the actor such as with a [[akka.actor.PoisonPill]]. + * * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 072756cebb..48d994a245 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -470,13 +470,18 @@ object Source { * * The stream can be completed successfully by sending the actor reference a message that is matched by * `completionMatcher` in which case already buffered elements will be signaled before signaling - * completion, or by sending [[akka.actor.PoisonPill]] in which case completion will be signaled immediately. + * completion. * * The stream can be completed with failure by sending a message that is matched by `failureMatcher`. The extracted * [[Throwable]] will be used to fail the stream. In case the Actor is still draining its internal buffer (after having received * a message matched by `completionMatcher`) before signaling completion and it receives a message matched by `failureMatcher`, * the failure will be signaled downstream immediately (instead of the completion signal). * + * Note that terminating the actor without first completing it, either with a success or a + * failure, will prevent the actor triggering downstream completion and the stream will continue + * to run even though the source actor is dead. Therefore you should **not** attempt to + * manually terminate the actor such as with a [[akka.actor.PoisonPill]]. + * * The actor will be stopped when the stream is completed, failed or canceled from downstream, * i.e. you can watch it to get notified when that happens. *