Removing unused code in ReflectiveAccess, fixing a performance-related issue in LocalDeployer and switched back to non-systemServices in the LocalActorRefProviderSpec
This commit is contained in:
parent
a044e41008
commit
a75310a181
4 changed files with 5 additions and 121 deletions
|
|
@ -20,7 +20,7 @@ class LocalActorRefProviderSpec extends AkkaSpec {
|
||||||
(0 until 100) foreach { i ⇒ // 100 concurrent runs
|
(0 until 100) foreach { i ⇒ // 100 concurrent runs
|
||||||
val address = "new-actor" + i
|
val address = "new-actor" + i
|
||||||
implicit val timeout = Timeout(30 seconds)
|
implicit val timeout = Timeout(30 seconds)
|
||||||
((1 to 4) map { _ ⇒ Future { provider.actorOf(Props(c ⇒ { case _ ⇒ }), app.guardian, address, true) } }).map(_.get).distinct.size must be(1)
|
((1 to 4) map { _ ⇒ Future { provider.actorOf(Props(c ⇒ { case _ ⇒ }), app.guardian, address) } }).map(_.get).distinct.size must be(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
|
||||||
val deploymentConfig = new DeploymentConfig(app)
|
val deploymentConfig = new DeploymentConfig(app)
|
||||||
|
|
||||||
lazy val instance: ActorDeployer = {
|
lazy val instance: ActorDeployer = {
|
||||||
val deployer = if (app.reflective.ClusterModule.isEnabled) app.reflective.ClusterModule.clusterDeployer else LocalDeployer
|
val deployer = LocalDeployer
|
||||||
deployer.init(deploymentsInConfig)
|
deployer.init(deploymentsInConfig)
|
||||||
deployer
|
deployer
|
||||||
}
|
}
|
||||||
|
|
@ -71,16 +71,8 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
|
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] =
|
||||||
instance.lookupDeploymentFor(address) match {
|
instance.lookupDeploymentFor(address)
|
||||||
case s @ Some(d) if d ne null ⇒ s
|
|
||||||
case _ ⇒
|
|
||||||
lookupInConfig(address) match {
|
|
||||||
case None | Some(null) ⇒ None
|
|
||||||
case s @ Some(d) ⇒ deploy(d); s // deploy and cache it
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private[akka] def deploymentsInConfig: List[Deploy] = {
|
private[akka] def deploymentsInConfig: List[Deploy] = {
|
||||||
for {
|
for {
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ import akka.AkkaApplication
|
||||||
*/
|
*/
|
||||||
class Dispatcher(
|
class Dispatcher(
|
||||||
_app: AkkaApplication,
|
_app: AkkaApplication,
|
||||||
_name: String,
|
val name: String,
|
||||||
val throughput: Int,
|
val throughput: Int,
|
||||||
val throughputDeadlineTime: Int,
|
val throughputDeadlineTime: Int,
|
||||||
val mailboxType: MailboxType,
|
val mailboxType: MailboxType,
|
||||||
|
|
@ -73,8 +73,6 @@ class Dispatcher(
|
||||||
val timeoutMs: Long)
|
val timeoutMs: Long)
|
||||||
extends MessageDispatcher(_app) {
|
extends MessageDispatcher(_app) {
|
||||||
|
|
||||||
val name = "akka:event-driven:dispatcher:" + _name
|
|
||||||
|
|
||||||
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
|
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
|
||||||
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
|
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -135,110 +135,4 @@ class ReflectiveAccess(val app: AkkaApplication) {
|
||||||
case Left(e) ⇒ throw e
|
case Left(e) ⇒ throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Reflective access to the Cluster module.
|
|
||||||
*
|
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
||||||
*/
|
|
||||||
object ClusterModule {
|
|
||||||
lazy val isEnabled = app.AkkaConfig.ClusterEnabled //&& clusterInstance.isDefined
|
|
||||||
|
|
||||||
lazy val clusterRefClass: Class[_] = getClassFor("akka.cluster.ClusterActorRef") match {
|
|
||||||
case Left(e) ⇒ throw e
|
|
||||||
case Right(b) ⇒ b
|
|
||||||
}
|
|
||||||
|
|
||||||
def newClusteredActorRef(props: RoutedProps): ActorRef = {
|
|
||||||
val params: Array[Class[_]] = Array(classOf[RoutedProps])
|
|
||||||
val args: Array[AnyRef] = Array(props)
|
|
||||||
|
|
||||||
createInstance(clusterRefClass, params, args) match {
|
|
||||||
case Left(e) ⇒ throw e
|
|
||||||
case Right(b) ⇒ b.asInstanceOf[ActorRef]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def ensureEnabled() {
|
|
||||||
if (!isEnabled) {
|
|
||||||
val e = new ModuleNotAvailableException(
|
|
||||||
"Can't load the cluster module, make sure it is enabled in the config ('akka.enabled-modules = [\"cluster\"])' and that akka-cluster.jar is on the classpath")
|
|
||||||
app.eventHandler.debug(this, e.toString)
|
|
||||||
throw e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy val clusterInstance: Option[Cluster] = getObjectFor("akka.cluster.Cluster$") match {
|
|
||||||
case Right(value) ⇒ Some(value)
|
|
||||||
case Left(exception) ⇒
|
|
||||||
app.eventHandler.debug(this, exception.toString)
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy val clusterDeployerInstance: Option[ActorDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match {
|
|
||||||
case Right(value) ⇒ Some(value)
|
|
||||||
case Left(exception) ⇒
|
|
||||||
app.eventHandler.debug(this, exception.toString)
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy val transactionLogInstance: Option[TransactionLogObject] = getObjectFor("akka.cluster.TransactionLog$") match {
|
|
||||||
case Right(value) ⇒ Some(value)
|
|
||||||
case Left(exception) ⇒
|
|
||||||
app.eventHandler.debug(this, exception.toString)
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy val node: ClusterNode = {
|
|
||||||
ensureEnabled()
|
|
||||||
clusterInstance.get.node
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy val clusterDeployer: ActorDeployer = {
|
|
||||||
ensureEnabled()
|
|
||||||
clusterDeployerInstance.get
|
|
||||||
}
|
|
||||||
|
|
||||||
lazy val transactionLog: TransactionLogObject = {
|
|
||||||
ensureEnabled()
|
|
||||||
transactionLogInstance.get
|
|
||||||
}
|
|
||||||
|
|
||||||
type Cluster = {
|
|
||||||
def node: ClusterNode
|
|
||||||
}
|
|
||||||
|
|
||||||
type Mailbox = {
|
|
||||||
def enqueue(message: Envelope)
|
|
||||||
def dequeue: Envelope
|
|
||||||
}
|
|
||||||
|
|
||||||
type TransactionLogObject = {
|
|
||||||
def newLogFor(
|
|
||||||
id: String,
|
|
||||||
isAsync: Boolean,
|
|
||||||
replicationScheme: ReplicationScheme): TransactionLog
|
|
||||||
|
|
||||||
def logFor(
|
|
||||||
id: String,
|
|
||||||
isAsync: Boolean,
|
|
||||||
replicationScheme: ReplicationScheme): TransactionLog
|
|
||||||
|
|
||||||
def shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
type TransactionLog = {
|
|
||||||
def recordEntry(messageHandle: Envelope, actorRef: LocalActorRef)
|
|
||||||
def recordEntry(entry: Array[Byte])
|
|
||||||
def recordSnapshot(snapshot: Array[Byte])
|
|
||||||
def entries: Vector[Array[Byte]]
|
|
||||||
def entriesFromLatestSnapshot: Tuple2[Array[Byte], Vector[Array[Byte]]]
|
|
||||||
def entriesInRange(from: Long, to: Long): Vector[Array[Byte]]
|
|
||||||
def latestSnapshotAndSubsequentEntries: (Option[Array[Byte]], Vector[Array[Byte]])
|
|
||||||
def latestEntryId: Long
|
|
||||||
def latestSnapshotId: Long
|
|
||||||
def delete()
|
|
||||||
def close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue