Renaming CompletableFuture to Promise, Renaming AlreadyCompletedFuture to KeptPromise, closing ticket #854
This commit is contained in:
parent
aa52486fdc
commit
8a790b1ddf
15 changed files with 74 additions and 74 deletions
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.remote.netty
|
||||
|
||||
import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture, Future }
|
||||
import akka.dispatch.{ DefaultPromise, Promise, Future }
|
||||
import akka.remote.{ MessageSerializer, RemoteClientSettings, RemoteServerSettings }
|
||||
import akka.remote.protocol.RemoteProtocol._
|
||||
import akka.serialization.RemoteActorSerialization
|
||||
|
|
@ -73,12 +73,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
|
||||
protected[akka] def send[T](message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
timeout: Long,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef,
|
||||
loader: Option[ClassLoader]): Option[CompletableFuture[T]] =
|
||||
loader: Option[ClassLoader]): Option[Promise[T]] =
|
||||
withClientFor(remoteAddress, loader)(_.send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef))
|
||||
|
||||
private[akka] def withClientFor[T](
|
||||
|
|
@ -154,7 +154,7 @@ abstract class RemoteClient private[akka] (
|
|||
remoteAddress.getAddress.getHostAddress + "::" +
|
||||
remoteAddress.getPort
|
||||
|
||||
protected val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
|
||||
protected val futures = new ConcurrentHashMap[Uuid, Promise[_]]
|
||||
protected val pendingRequests = {
|
||||
if (transactionLogCapacity < 0) new ConcurrentLinkedQueue[(Boolean, Uuid, RemoteMessageProtocol)]
|
||||
else new LinkedBlockingQueue[(Boolean, Uuid, RemoteMessageProtocol)](transactionLogCapacity)
|
||||
|
|
@ -191,11 +191,11 @@ abstract class RemoteClient private[akka] (
|
|||
def send[T](
|
||||
message: Any,
|
||||
senderOption: Option[ActorRef],
|
||||
senderFuture: Option[CompletableFuture[T]],
|
||||
senderFuture: Option[Promise[T]],
|
||||
remoteAddress: InetSocketAddress,
|
||||
timeout: Long,
|
||||
isOneWay: Boolean,
|
||||
actorRef: ActorRef): Option[CompletableFuture[T]] =
|
||||
actorRef: ActorRef): Option[Promise[T]] =
|
||||
send(createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef), Left(actorRef.uuid), actorRef.address, timeout, Right(message), isOneWay, senderOption).build,
|
||||
senderFuture)
|
||||
|
|
@ -205,7 +205,7 @@ abstract class RemoteClient private[akka] (
|
|||
*/
|
||||
def send[T](
|
||||
request: RemoteMessageProtocol,
|
||||
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
|
||||
if (isRunning) {
|
||||
if (request.getOneWay) {
|
||||
try {
|
||||
|
|
@ -227,7 +227,7 @@ abstract class RemoteClient private[akka] (
|
|||
None
|
||||
} else {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
else new DefaultPromise[T](request.getActorInfo.getTimeout)
|
||||
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
|
||||
futures.put(futureUuid, futureResult) // Add future prematurely, remove it if write fails
|
||||
|
||||
|
|
@ -410,7 +410,7 @@ class ActiveRemoteClient private[akka] (
|
|||
*/
|
||||
class ActiveRemoteClientPipelineFactory(
|
||||
name: String,
|
||||
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
futures: ConcurrentMap[Uuid, Promise[_]],
|
||||
bootstrap: ClientBootstrap,
|
||||
remoteAddress: InetSocketAddress,
|
||||
timer: HashedWheelTimer,
|
||||
|
|
@ -439,7 +439,7 @@ class ActiveRemoteClientPipelineFactory(
|
|||
@ChannelHandler.Sharable
|
||||
class ActiveRemoteClientHandler(
|
||||
val name: String,
|
||||
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
|
||||
val futures: ConcurrentMap[Uuid, Promise[_]],
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: InetSocketAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
|
|
@ -457,7 +457,7 @@ class ActiveRemoteClientHandler(
|
|||
case arp: AkkaRemoteProtocol if arp.hasMessage ⇒
|
||||
val reply = arp.getMessage
|
||||
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
|
||||
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
val future = futures.remove(replyUuid).asInstanceOf[Promise[Any]]
|
||||
|
||||
if (reply.hasMessage) {
|
||||
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
|
||||
|
|
@ -891,7 +891,7 @@ class RemoteServerHandler(
|
|||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
None,
|
||||
Some(new DefaultCompletableFuture[Any](request.getActorInfo.getTimeout).
|
||||
Some(new DefaultPromise[Any](request.getActorInfo.getTimeout).
|
||||
onComplete(_.value.get match {
|
||||
case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request))
|
||||
case r: Right[Throwable, Any] ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue