Create typed ActorMaterializer from ActorContext, ##25536
* Create typed ActorMaterializer from ActorContext to bind stream's lifecycle with an actor's lifecycle * Add Java API for creating typed ActorMaterializer from ActorContext
This commit is contained in:
parent
abb3429bc8
commit
e26a90f340
6 changed files with 74 additions and 6 deletions
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.actor.typed.javadsl
|
||||
|
||||
import akka.actor
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Props
|
||||
import akka.actor.typed.EmptyProps
|
||||
|
|
@ -59,6 +60,9 @@ object Adapter {
|
|||
def toUntyped(sys: ActorSystem[_]): akka.actor.ActorSystem =
|
||||
sys.toUntyped
|
||||
|
||||
def toUntyped(ctx: ActorContext[_]): actor.ActorContext =
|
||||
ActorContextAdapter.toUntyped(ctx)
|
||||
|
||||
def watch[U](ctx: akka.actor.ActorContext, other: ActorRef[U]): Unit =
|
||||
ctx.watch(other)
|
||||
|
||||
|
|
|
|||
|
|
@ -88,6 +88,8 @@ package object adapter {
|
|||
def actorOf(props: akka.actor.Props, name: String): akka.actor.ActorRef =
|
||||
ActorContextAdapter.toUntyped(ctx).actorOf(props, name)
|
||||
|
||||
def toUntyped: akka.actor.ActorContext = ActorContextAdapter.toUntyped(ctx)
|
||||
|
||||
// watch, unwatch and stop not needed here because of the implicit ActorRef conversion
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.stream.typed.javadsl
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.javadsl.{ ActorContext, Adapter }
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
|
||||
object ActorMaterializerFactory {
|
||||
|
|
@ -43,4 +44,34 @@ object ActorMaterializerFactory {
|
|||
def create[T](settings: ActorMaterializerSettings, namePrefix: String, actorSystem: ActorSystem[T]): akka.stream.ActorMaterializer =
|
||||
akka.stream.ActorMaterializer.create(settings, actorSystem.toUntyped, namePrefix)
|
||||
|
||||
/**
|
||||
* Creates an `ActorMaterializer` which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
|
||||
* will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]]
|
||||
*
|
||||
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
|
||||
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def create[T](ctx: ActorContext[T]): akka.stream.ActorMaterializer =
|
||||
akka.stream.ActorMaterializer.create(Adapter.toUntyped(ctx))
|
||||
|
||||
/**
|
||||
* Creates an `ActorMaterializer` which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
|
||||
* will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]]
|
||||
*/
|
||||
def create[T](settings: ActorMaterializerSettings, ctx: ActorContext[T]): akka.stream.ActorMaterializer =
|
||||
akka.stream.ActorMaterializer.create(settings, Adapter.toUntyped(ctx))
|
||||
|
||||
/**
|
||||
* Creates an `ActorMaterializer` which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
|
||||
* will be bound to the lifecycle of the provided [[akka.actor.typed.javadsl.ActorContext]]
|
||||
*
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def create[T](settings: ActorMaterializerSettings, namePrefix: String, ctx: ActorContext[T]): akka.stream.ActorMaterializer =
|
||||
akka.stream.ActorMaterializer.create(settings, Adapter.toUntyped(ctx), namePrefix)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@
|
|||
package akka.stream.typed.scaladsl
|
||||
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
|
||||
object ActorMaterializer {
|
||||
|
|
@ -25,4 +26,19 @@ object ActorMaterializer {
|
|||
def apply[T](materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit actorSystem: ActorSystem[T]): ActorMaterializer =
|
||||
akka.stream.ActorMaterializer(materializerSettings, namePrefix)(actorSystem.toUntyped)
|
||||
|
||||
/**
|
||||
* Creates an `ActorMaterializer` which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The lifecycle of the materialized streams
|
||||
* will be bound to the lifecycle of the provided [[akka.actor.typed.scaladsl.ActorContext]]
|
||||
*
|
||||
* The materializer's [[akka.stream.ActorMaterializerSettings]] will be obtained from the
|
||||
* configuration of the `context`'s underlying [[akka.actor.typed.ActorSystem]].
|
||||
*
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def boundToActor[T](ctx: ActorContext[T], materializerSettings: Option[ActorMaterializerSettings] = None, namePrefix: Option[String] = None): ActorMaterializer =
|
||||
akka.stream.ActorMaterializer(materializerSettings, namePrefix)(ctx.toUntyped)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
package akka.stream.typed.scaladsl
|
||||
|
||||
//#imports
|
||||
import akka.stream.typed.scaladsl.ActorMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
|
|
|
|||
|
|
@ -4,14 +4,16 @@
|
|||
|
||||
package akka.stream.typed.scaladsl
|
||||
|
||||
import scala.concurrent.Future
|
||||
import akka.Done
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.stream.AbruptStageTerminationException
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
object CustomGuardianAndMaterializerSpec {
|
||||
|
||||
sealed trait GuardianProtocol
|
||||
|
|
@ -38,6 +40,20 @@ class CustomGuardianAndMaterializerSpec extends ScalaTestWithActorTestKit with W
|
|||
it.futureValue should ===("hello")
|
||||
}
|
||||
|
||||
}
|
||||
"should kill streams with bound actor context" in {
|
||||
var doneF: Future[Done] = null
|
||||
val behavior =
|
||||
Behaviors.setup[String] { ctx ⇒
|
||||
implicit val mat: ActorMaterializer = ActorMaterializer.boundToActor(ctx)
|
||||
doneF = Source.repeat("hello").runWith(Sink.ignore)
|
||||
|
||||
Behaviors.receiveMessage[String](_ ⇒ Behaviors.stopped)
|
||||
}
|
||||
|
||||
val actorRef = spawn(behavior)
|
||||
|
||||
actorRef ! "kill"
|
||||
eventually(doneF.failed.futureValue shouldBe an[AbruptStageTerminationException])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue