improved perf with 25 % + renamed FutureResult -> Future + Added lightweight future factory method
This commit is contained in:
parent
47d19119a5
commit
15ed113112
9 changed files with 60 additions and 92 deletions
|
|
@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote
|
|||
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply}
|
||||
import se.scalablesolutions.akka.actor.{Exit, Actor}
|
||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFutureResult, CompletableFutureResult}
|
||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
||||
import se.scalablesolutions.akka.util.{UUID, Logging}
|
||||
import se.scalablesolutions.akka.Config.config
|
||||
|
||||
|
|
@ -86,7 +86,7 @@ object RemoteClient extends Logging {
|
|||
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||
message: Any,
|
||||
timeout: Long,
|
||||
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
|
||||
senderFuture: Option[CompletableFuture]): CompletableFuture = {
|
||||
val requestBuilder = RemoteRequest.newBuilder
|
||||
.setId(RemoteRequestIdFactory.nextId)
|
||||
.setTarget(className)
|
||||
|
|
@ -168,7 +168,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
val name = "RemoteClient@" + hostname + "::" + port
|
||||
|
||||
@volatile private[remote] var isRunning = false
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFuture]
|
||||
private val supervisors = new ConcurrentHashMap[String, Actor]
|
||||
|
||||
private val channelFactory = new NioClientSocketChannelFactory(
|
||||
|
|
@ -208,14 +208,14 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def send(request: RemoteRequest, senderFuture: Option[CompletableFutureResult]): Option[CompletableFutureResult] = if (isRunning) {
|
||||
def send(request: RemoteRequest, senderFuture: Option[CompletableFuture]): Option[CompletableFuture] = if (isRunning) {
|
||||
if (request.getIsOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
None
|
||||
} else {
|
||||
futures.synchronized {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFutureResult(request.getTimeout)
|
||||
else new DefaultCompletableFuture(request.getTimeout)
|
||||
futures.put(request.getId, futureResult)
|
||||
connection.getChannel.write(request)
|
||||
Some(futureResult)
|
||||
|
|
@ -238,7 +238,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClientPipelineFactory(name: String,
|
||||
futures: ConcurrentMap[Long, CompletableFutureResult],
|
||||
futures: ConcurrentMap[Long, CompletableFuture],
|
||||
supervisors: ConcurrentMap[String, Actor],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: SocketAddress,
|
||||
|
|
@ -269,7 +269,7 @@ class RemoteClientPipelineFactory(name: String,
|
|||
*/
|
||||
@ChannelPipelineCoverage {val value = "all"}
|
||||
class RemoteClientHandler(val name: String,
|
||||
val futures: ConcurrentMap[Long, CompletableFutureResult],
|
||||
val futures: ConcurrentMap[Long, CompletableFuture],
|
||||
val supervisors: ConcurrentMap[String, Actor],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: SocketAddress,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue