harmonize MyActor.props pattern, see #3418
This commit is contained in:
parent
92db59183e
commit
07baf05bae
11 changed files with 24 additions and 27 deletions
|
|
@ -37,7 +37,7 @@ object BackpressureSpec {
|
||||||
val init = TcpPipelineHandler.withLogger(log,
|
val init = TcpPipelineHandler.withLogger(log,
|
||||||
new TcpReadWriteAdapter >>
|
new TcpReadWriteAdapter >>
|
||||||
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
|
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
|
||||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
||||||
sender ! Tcp.Register(handler)
|
sender ! Tcp.Register(handler)
|
||||||
unstashAll()
|
unstashAll()
|
||||||
context.become(connected(init, handler))
|
context.become(connected(init, handler))
|
||||||
|
|
@ -114,7 +114,7 @@ object BackpressureSpec {
|
||||||
val init = TcpPipelineHandler.withLogger(log,
|
val init = TcpPipelineHandler.withLogger(log,
|
||||||
new TcpReadWriteAdapter >>
|
new TcpReadWriteAdapter >>
|
||||||
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
|
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
|
||||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
||||||
sender ! Tcp.Register(handler)
|
sender ! Tcp.Register(handler)
|
||||||
unstashAll()
|
unstashAll()
|
||||||
context.become(connected(init, handler))
|
context.become(connected(init, handler))
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,7 @@ class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on"
|
||||||
|
|
||||||
import init._
|
import init._
|
||||||
|
|
||||||
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref).withDeploy(Deploy.local),
|
val handler = system.actorOf(TcpPipelineHandler.props(init, connection, probe.ref).withDeploy(Deploy.local),
|
||||||
"client" + counter.incrementAndGet())
|
"client" + counter.incrementAndGet())
|
||||||
probe.send(connection, Tcp.Register(handler))
|
probe.send(connection, Tcp.Register(handler))
|
||||||
|
|
||||||
|
|
@ -129,7 +129,7 @@ class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on"
|
||||||
import init._
|
import init._
|
||||||
|
|
||||||
val connection = sender
|
val connection = sender
|
||||||
val handler = context.actorOf(TcpPipelineHandler(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline")
|
||||||
|
|
||||||
connection ! Tcp.Register(handler)
|
connection ! Tcp.Register(handler)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -94,15 +94,9 @@ object TcpPipelineHandler {
|
||||||
case class TcpEvent(@BeanProperty evt: Tcp.Event) extends Tcp.Command
|
case class TcpEvent(@BeanProperty evt: Tcp.Event) extends Tcp.Command
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: create [[Props]] for a pipeline handler
|
* create [[Props]] for a pipeline handler
|
||||||
*/
|
*/
|
||||||
def apply[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) =
|
def props[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) =
|
||||||
Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Java API: create [[Props]] for a pipeline handler
|
|
||||||
*/
|
|
||||||
def create[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) =
|
|
||||||
Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler)
|
Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ private[akka] class ClusterActorRefProvider(
|
||||||
failureDetector,
|
failureDetector,
|
||||||
heartbeatInterval = WatchHeartBeatInterval,
|
heartbeatInterval = WatchHeartBeatInterval,
|
||||||
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
||||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter).withDeploy(Deploy.local), "remote-watcher")
|
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), "remote-watcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.cluster.ClusterEvent.MemberUp
|
||||||
import akka.cluster.ClusterEvent.MemberRemoved
|
import akka.cluster.ClusterEvent.MemberRemoved
|
||||||
import akka.remote.FailureDetectorRegistry
|
import akka.remote.FailureDetectorRegistry
|
||||||
import akka.remote.RemoteWatcher
|
import akka.remote.RemoteWatcher
|
||||||
|
import akka.actor.Deploy
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -27,7 +28,7 @@ private[cluster] object ClusterRemoteWatcher {
|
||||||
unreachableReaperInterval: FiniteDuration,
|
unreachableReaperInterval: FiniteDuration,
|
||||||
heartbeatExpectedResponseAfter: FiniteDuration): Props =
|
heartbeatExpectedResponseAfter: FiniteDuration): Props =
|
||||||
Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
|
Props(classOf[ClusterRemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
|
||||||
heartbeatExpectedResponseAfter)
|
heartbeatExpectedResponseAfter).withDeploy(Deploy.local)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,7 @@ public class SslDocTest {
|
||||||
|
|
||||||
// create handler for pipeline, setting ourselves as payload recipient
|
// create handler for pipeline, setting ourselves as payload recipient
|
||||||
final ActorRef handler = getContext().actorOf(
|
final ActorRef handler = getContext().actorOf(
|
||||||
TcpPipelineHandler.create(init, getSender(), getSelf()));
|
TcpPipelineHandler.props(init, getSender(), getSelf()));
|
||||||
|
|
||||||
// register the SSL handler with the connection
|
// register the SSL handler with the connection
|
||||||
getSender().tell(TcpMessage.register(handler), getSelf());
|
getSender().tell(TcpMessage.register(handler), getSelf());
|
||||||
|
|
@ -156,7 +156,7 @@ public class SslDocTest {
|
||||||
|
|
||||||
// create handler for pipeline, setting ourselves as payload recipient
|
// create handler for pipeline, setting ourselves as payload recipient
|
||||||
final ActorRef handler = getContext().actorOf(
|
final ActorRef handler = getContext().actorOf(
|
||||||
TcpPipelineHandler.create(init, getSender(), getSelf()));
|
TcpPipelineHandler.props(init, getSender(), getSelf()));
|
||||||
|
|
||||||
// register the SSL handler with the connection
|
// register the SSL handler with the connection
|
||||||
getSender().tell(TcpMessage.register(handler), getSelf());
|
getSender().tell(TcpMessage.register(handler), getSelf());
|
||||||
|
|
|
||||||
|
|
@ -147,7 +147,7 @@ private[remote] object ReliableDeliverySupervisor {
|
||||||
case object Ungate
|
case object Ungate
|
||||||
case class GotUid(uid: Int)
|
case class GotUid(uid: Int)
|
||||||
|
|
||||||
def apply(
|
def props(
|
||||||
handleOrActive: Option[AkkaProtocolHandle],
|
handleOrActive: Option[AkkaProtocolHandle],
|
||||||
localAddress: Address,
|
localAddress: Address,
|
||||||
remoteAddress: Address,
|
remoteAddress: Address,
|
||||||
|
|
@ -299,7 +299,7 @@ private[remote] class ReliableDeliverySupervisor(
|
||||||
}
|
}
|
||||||
|
|
||||||
private def createWriter(): ActorRef = {
|
private def createWriter(): ActorRef = {
|
||||||
context.watch(context.actorOf(EndpointWriter(
|
context.watch(context.actorOf(EndpointWriter.props(
|
||||||
handleOrActive = currentHandle,
|
handleOrActive = currentHandle,
|
||||||
localAddress = localAddress,
|
localAddress = localAddress,
|
||||||
remoteAddress = remoteAddress,
|
remoteAddress = remoteAddress,
|
||||||
|
|
@ -339,7 +339,7 @@ private[remote] abstract class EndpointActor(
|
||||||
*/
|
*/
|
||||||
private[remote] object EndpointWriter {
|
private[remote] object EndpointWriter {
|
||||||
|
|
||||||
def apply(
|
def props(
|
||||||
handleOrActive: Option[AkkaProtocolHandle],
|
handleOrActive: Option[AkkaProtocolHandle],
|
||||||
localAddress: Address,
|
localAddress: Address,
|
||||||
remoteAddress: Address,
|
remoteAddress: Address,
|
||||||
|
|
@ -580,7 +580,7 @@ private[remote] class EndpointWriter(
|
||||||
private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
|
private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
|
||||||
val newReader =
|
val newReader =
|
||||||
context.watch(context.actorOf(
|
context.watch(context.actorOf(
|
||||||
EndpointReader(localAddress, remoteAddress, transport, settings, codec,
|
EndpointReader.props(localAddress, remoteAddress, transport, settings, codec,
|
||||||
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local),
|
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local),
|
||||||
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
|
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
|
||||||
handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
|
handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
|
||||||
|
|
@ -603,7 +603,7 @@ private[remote] class EndpointWriter(
|
||||||
*/
|
*/
|
||||||
private[remote] object EndpointReader {
|
private[remote] object EndpointReader {
|
||||||
|
|
||||||
def apply(
|
def props(
|
||||||
localAddress: Address,
|
localAddress: Address,
|
||||||
remoteAddress: Address,
|
remoteAddress: Address,
|
||||||
transport: Transport,
|
transport: Transport,
|
||||||
|
|
|
||||||
|
|
@ -191,7 +191,7 @@ private[akka] class RemoteActorRefProvider(
|
||||||
failureDetector,
|
failureDetector,
|
||||||
heartbeatInterval = WatchHeartBeatInterval,
|
heartbeatInterval = WatchHeartBeatInterval,
|
||||||
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
unreachableReaperInterval = WatchUnreachableReaperInterval,
|
||||||
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter).withDeploy(Deploy.local),
|
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter),
|
||||||
"remote-watcher")
|
"remote-watcher")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import akka.ConfigurationException
|
||||||
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
|
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
|
||||||
import akka.actor.InternalActorRef
|
import akka.actor.InternalActorRef
|
||||||
import akka.dispatch.sysmsg.DeathWatchNotification
|
import akka.dispatch.sysmsg.DeathWatchNotification
|
||||||
|
import akka.actor.Deploy
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -32,7 +33,7 @@ private[akka] object RemoteWatcher {
|
||||||
unreachableReaperInterval: FiniteDuration,
|
unreachableReaperInterval: FiniteDuration,
|
||||||
heartbeatExpectedResponseAfter: FiniteDuration): Props =
|
heartbeatExpectedResponseAfter: FiniteDuration): Props =
|
||||||
Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
|
Props(classOf[RemoteWatcher], failureDetector, heartbeatInterval, unreachableReaperInterval,
|
||||||
heartbeatExpectedResponseAfter)
|
heartbeatExpectedResponseAfter).withDeploy(Deploy.local)
|
||||||
|
|
||||||
case class WatchRemote(watchee: ActorRef, watcher: ActorRef)
|
case class WatchRemote(watchee: ActorRef, watcher: ActorRef)
|
||||||
case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef)
|
case class UnwatchRemote(watchee: ActorRef, watcher: ActorRef)
|
||||||
|
|
|
||||||
|
|
@ -617,7 +617,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
writing: Boolean): ActorRef = {
|
writing: Boolean): ActorRef = {
|
||||||
assert(transportMapping contains localAddress)
|
assert(transportMapping contains localAddress)
|
||||||
|
|
||||||
if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor(
|
if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor.props(
|
||||||
handleOption,
|
handleOption,
|
||||||
localAddress,
|
localAddress,
|
||||||
remoteAddress,
|
remoteAddress,
|
||||||
|
|
@ -626,7 +626,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
||||||
AkkaPduProtobufCodec,
|
AkkaPduProtobufCodec,
|
||||||
receiveBuffers).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local),
|
receiveBuffers).withDispatcher("akka.remote.writer-dispatcher").withDeploy(Deploy.local),
|
||||||
"reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
|
"reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
|
||||||
else context.watch(context.actorOf(EndpointWriter(
|
else context.watch(context.actorOf(EndpointWriter.props(
|
||||||
handleOption,
|
handleOption,
|
||||||
localAddress,
|
localAddress,
|
||||||
remoteAddress,
|
remoteAddress,
|
||||||
|
|
|
||||||
|
|
@ -110,7 +110,7 @@ class SslTlsSupportSpec extends AkkaSpec {
|
||||||
|
|
||||||
import init._
|
import init._
|
||||||
|
|
||||||
val handler = system.actorOf(TcpPipelineHandler(init, connection, probe.ref).withDeploy(Deploy.local),
|
val handler = system.actorOf(TcpPipelineHandler.props(init, connection, probe.ref).withDeploy(Deploy.local),
|
||||||
"client" + counter.incrementAndGet())
|
"client" + counter.incrementAndGet())
|
||||||
probe.send(connection, Tcp.Register(handler))
|
probe.send(connection, Tcp.Register(handler))
|
||||||
|
|
||||||
|
|
@ -166,7 +166,8 @@ class SslTlsSupportSpec extends AkkaSpec {
|
||||||
//#server
|
//#server
|
||||||
context watch handler
|
context watch handler
|
||||||
//#server
|
//#server
|
||||||
val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler).withDeploy(Deploy.local))
|
val pipeline = context.actorOf(TcpPipelineHandler.props(
|
||||||
|
init, sender, handler).withDeploy(Deploy.local))
|
||||||
|
|
||||||
connection ! Tcp.Register(pipeline)
|
connection ! Tcp.Register(pipeline)
|
||||||
//#server
|
//#server
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue