+doc document lifecycle of ActorMaterializer explicitly (#23836)

* +doc #23835 document lifecycle of ActorMaterializer explicitly

* Update stream-flows-and-basics.md

* Update stream-flows-and-basics.md
This commit is contained in:
Konrad `ktoso` Malawski 2017-11-09 00:26:02 +09:00 committed by GitHub
parent e547d2b295
commit 4d583d1e6c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 201 additions and 10 deletions

View file

@ -355,3 +355,56 @@ been signalled already thus the ordering in the case of zipping is defined b
If you find yourself in need of fine grained control over order of emitted elements in fan-in
scenarios consider using `MergePreferred`, `MergePrioritized` or `GraphStage` which gives you full control over how the
merge is performed.
# Actor Materializer Lifecycle
An important aspect of working with streams and actors is understanding an `ActorMaterializer`'s life-cycle.
The materializer is bound to the lifecycle of the `ActorRefFactory` it is created from, which in practice will
be either an `ActorSystem` or `ActorContext` (when the materializer is created within an `Actor`).
The usual way of creating an `ActorMaterializer` is to create it next to your `ActorSystem`,
which likely is in a "main" class of your application:
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materializer-from-system }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materializer-from-system }
In this case the streams run by the materializer will run until it is shut down. When the materializer is shut down
*before* the streams have run to completion, they will be terminated abruptly. This is a little different than the
usual way to terminate streams, which is by cancelling/completing them. The stream lifecycles are bound to the materializer
like this to prevent leaks, and in normal operations you should not rely on the mechanism and rather use `KillSwitch` or
normal completion signals to manage the lifecycles of your streams.
If we look at the following example, where we create the `ActorMaterializer` within an `Actor`:
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materializer-from-actor-context }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materializer-from-actor-context }
In the above example we used the `ActorContext` to create the materializer. This binds its lifecycle to the surrounding `Actor`. In other words, while the stream we started there would under normal circumstances run forever, if we stop the Actor it would terminate the stream as well. We have *bound the streams' lifecycle to the surrounding actor's lifecycle*.
This is a very useful technique if the stream is closely related to the actor, e.g. when the actor represents an user or other entity, that we continiously query using the created stream -- and it would not make sense to keep the stream alive when the actor has terminated already. The streams termination will be signalled by an "Abrupt termination exception" signaled by the stream.
You may also cause an `ActorMaterializer` to shutdown by explicitly calling `shutdown()` on it, resulting in abruptly terminating all of the streams it has been running then.
Sometimes however you may want to explicitly create a stream that will out-last the actor's life. For example, if you want to continue pushing some large stream of data to an external service and are doing so via an Akka stream while you already want to eagerly stop the Actor since it has performed all of it's duties already:
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materializer-from-system-in-actor }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materializer-from-system-in-actor }
In the above example we pass in a materializer to the Actor, which results in binding its lifecycle to the entire `ActorSystem` rather than the single enclosing actor. This can be useful if you want to share a materializer or group streams into specific materializers,
for example because of the materializer's settings etc.
@@@ warning
Do not create new actor materializers inside actors by passing the `context.system` to it.
This will cause a new @ActorMaterializer@ to be created and potentially leaked (unless you shut it down explicitly) for each such actor.
It is instead recommended to either pass-in the Materializer or create one using the actor's `context`.
@@@

View file

@ -11,6 +11,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.japi.Pair;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
@ -279,4 +280,64 @@ public class FlowDocTest extends AbstractJavaTest {
//#flow-async
}
static {
//#materializer-from-system
ActorSystem system = ActorSystem.create("ExampleSystem");
// created from `system`:
ActorMaterializer mat = ActorMaterializer.create(system);
//#materializer-from-system
}
//#materializer-from-actor-context
final class RunWithMyself extends AbstractActor {
ActorMaterializer mat = ActorMaterializer.create(context());
@Override
public void preStart() throws Exception {
Source
.repeat("hello")
.runWith(Sink.onComplete(tryDone -> {
System.out.println("Terminated stream: " + tryDone);
}), mat);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, p -> {
// this WILL terminate the above stream as well
context().stop(self());
}).build();
}
}
//#materializer-from-actor-context
//#materializer-from-system-in-actor
final class RunForever extends AbstractActor {
final ActorMaterializer mat;
RunForever(ActorMaterializer mat) {
this.mat = mat;
}
@Override
public void preStart() throws Exception {
Source
.repeat("hello")
.runWith(Sink.onComplete(tryDone -> {
System.out.println("Terminated stream: " + tryDone);
}), mat);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, p -> {
// will NOT terminate the stream (it's bound to the system!)
context().stop(self());
}).build();
}
//#materializer-from-system-in-actor
}
}

View file

@ -3,15 +3,17 @@
*/
package docs.stream
import akka.NotUsed
import akka.actor.Cancellable
import akka.stream.{ ClosedShape, FlowShape }
import akka.{ Done, NotUsed }
import akka.actor.{ Actor, ActorSystem, Cancellable }
import akka.stream.{ ActorMaterializer, ClosedShape, FlowShape, Materializer }
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
import scala.concurrent.{ Promise, Future }
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }
class FlowDocSpec extends AkkaSpec {
class FlowDocSpec extends AkkaSpec with CompileOnlySpec {
implicit val ec = system.dispatcher
@ -229,3 +231,48 @@ class FlowDocSpec extends AkkaSpec {
//#flow-async
}
}
object FlowDocSpec {
{
//#materializer-from-system
implicit val system = ActorSystem("ExampleSystem")
implicit val mat = ActorMaterializer() // created from `system`
//#materializer-from-system
}
//#materializer-from-actor-context
final class RunWithMyself extends Actor {
implicit val mat = ActorMaterializer()
Source.maybe
.runWith(Sink.onComplete {
case Success(done) println(s"Completed: $done")
case Failure(ex) println(s"Failed: ${ex.getMessage}")
})
def receive = {
case "boom"
context.stop(self) // will also terminate the stream
}
}
//#materializer-from-actor-context
//#materializer-from-system-in-actor
final class RunForever(implicit val mat: Materializer) extends Actor {
Source.maybe
.runWith(Sink.onComplete {
case Success(done) println(s"Completed: $done")
case Failure(ex) println(s"Failed: ${ex.getMessage}")
})
def receive = {
case "boom"
context.stop(self) // will NOT terminate the stream (it's bound to the system!)
}
}
//#materializer-from-system-in-actor
}

View file

@ -1,13 +1,16 @@
package akka.stream
import akka.actor.{ ActorSystem, Props }
import akka.Done
import akka.actor.{ Actor, ActorSystem, PoisonPill, Props }
import akka.stream.ActorMaterializerSpec.ActorWithMaterializer
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.testkit.{ ImplicitSender, TestActor }
import akka.testkit.{ ImplicitSender, TestActor, TestProbe }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{ Failure, Try }
class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
@ -47,7 +50,18 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
m.shutdown()
m.supervisor ! StreamSupervisor.GetChildren
expectNoMsg(1.second)
expectNoMessage(1.second)
}
"terminate if ActorContext it was created from terminates" in {
val p = TestProbe()
val a = system.actorOf(Props(new ActorWithMaterializer(p)).withDispatcher("akka.test.stream-dispatcher"))
p.expectMsg("hello")
p.expectMsg("one")
a ! PoisonPill
val Failure(ex) = p.expectMsgType[Try[Done]]
}
"handle properly broken Props" in {
@ -67,3 +81,19 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
}
}
object ActorMaterializerSpec {
class ActorWithMaterializer(p: TestProbe) extends Actor {
private val settings: ActorMaterializerSettings = ActorMaterializerSettings(context.system).withDispatcher("akka.test.stream-dispatcher")
implicit val mat = ActorMaterializer(settings)(context)
Source.repeat("hello")
.alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one")))
.runWith(Sink.onComplete(signal {
println(signal)
p.ref ! signal
}))
def receive = Actor.emptyBehavior
}
}

View file

@ -29,7 +29,7 @@ trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] {
def tell(msg: T): Unit
/**
* Narrow the type of this `ActorRef, which is always a safe operation.
* Narrow the type of this `ActorRef`, which is always a safe operation.
*/
def narrow[U <: T]: ActorRef[U]