Closing ticket #1030, removing lots of warnings
This commit is contained in:
parent
fe1051af30
commit
3bc7db0dde
10 changed files with 46 additions and 47 deletions
|
|
@ -1011,7 +1011,7 @@ private[akka] case class RemoteActorRef private[akka](
|
|||
case _ ⇒ None
|
||||
}
|
||||
val chFuture = channel match {
|
||||
case f: Promise[Any] ⇒ Some(f)
|
||||
case f: Promise[_] ⇒ Some(f.asInstanceOf[Promise[Any]])
|
||||
case _ ⇒ None
|
||||
}
|
||||
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader)
|
||||
|
|
|
|||
|
|
@ -197,14 +197,15 @@ object Dispatchers {
|
|||
case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator
|
||||
case fqn ⇒
|
||||
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
|
||||
case r: Right[_, Class[MessageDispatcherConfigurator]] ⇒
|
||||
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](r.b, Array[Class[_]](), Array[AnyRef]()) match {
|
||||
case r: Right[Exception, MessageDispatcherConfigurator] ⇒ r.b
|
||||
case l: Left[Exception, MessageDispatcherConfigurator] ⇒
|
||||
throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, l.a)
|
||||
case Right(clazz) ⇒
|
||||
ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]()) match {
|
||||
case Right(configurator) ⇒ configurator
|
||||
case Left(exception)⇒
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn, exception)
|
||||
}
|
||||
case l: Left[Exception, _] ⇒
|
||||
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, l.a)
|
||||
case Left(exception) ⇒
|
||||
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn, exception)
|
||||
}
|
||||
} map {
|
||||
_ configure cfg
|
||||
|
|
|
|||
|
|
@ -90,8 +90,8 @@ object Futures {
|
|||
|
||||
val aggregate: Future[T] ⇒ Unit = f ⇒ if (done.isOff && !result.isCompleted) { //TODO: This is an optimization, is it premature?
|
||||
f.value.get match {
|
||||
case r: Right[Throwable, T] ⇒
|
||||
val added = results add r.b
|
||||
case Right(value) ⇒
|
||||
val added = results add value
|
||||
if (added && results.size == allDone) { //Only one thread can get here
|
||||
if (done.switchOn) {
|
||||
try {
|
||||
|
|
@ -109,9 +109,9 @@ object Futures {
|
|||
}
|
||||
}
|
||||
}
|
||||
case l: Left[Throwable, T] ⇒
|
||||
case Left(exception) ⇒
|
||||
if (done.switchOn) {
|
||||
result completeWithException l.a
|
||||
result completeWithException exception
|
||||
results.clear
|
||||
}
|
||||
}
|
||||
|
|
@ -148,10 +148,8 @@ object Futures {
|
|||
val seedFold: Future[T] ⇒ Unit = f ⇒ {
|
||||
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
|
||||
f.value.get match {
|
||||
case r: Right[Throwable, T] ⇒
|
||||
result.completeWith(fold(r.b, timeout)(futures.filterNot(_ eq f))(op))
|
||||
case l: Left[Throwable, T] ⇒
|
||||
result.completeWithException(l.a)
|
||||
case Right(value) ⇒ result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
|
||||
case Left(exception) ⇒ result.completeWithException(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,8 +112,8 @@ object EventHandler extends ListenerManagement {
|
|||
defaultListeners foreach { listenerName ⇒
|
||||
try {
|
||||
ReflectiveAccess.getClassFor[Actor](listenerName) match {
|
||||
case r: Right[_, Class[Actor]] ⇒ addListener(Actor.localActorOf(r.b).start())
|
||||
case l: Left[Exception, _] ⇒ throw l.a
|
||||
case Right(actorClass) ⇒ addListener(Actor.localActorOf(actorClass).start())
|
||||
case Left(exception) ⇒ throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
|
|
|
|||
|
|
@ -265,7 +265,7 @@ trait MailboxPressureCapacitor {
|
|||
*/
|
||||
trait ActiveFuturesPressureCapacitor {
|
||||
def pressure(delegates: Seq[ActorRef]): Int =
|
||||
delegates count { _.channel.isInstanceOf[Promise[Any]] }
|
||||
delegates count { _.channel.isInstanceOf[Promise[_]] }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1917,7 +1917,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
case f: Function0[Unit] ⇒ try {
|
||||
case f: Function0[_] ⇒ try {
|
||||
f()
|
||||
} finally {
|
||||
self.stop()
|
||||
|
|
@ -1930,7 +1930,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
case f: Function0[Any] ⇒ try {
|
||||
case f: Function0[_] ⇒ try {
|
||||
self.reply(f())
|
||||
} finally {
|
||||
self.stop()
|
||||
|
|
@ -1943,8 +1943,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
case (fun: Function[Any, Unit], param: Any) ⇒ try {
|
||||
fun(param)
|
||||
case (fun: Function[_, _], param: Any) ⇒ try {
|
||||
fun.asInstanceOf[Any => Unit].apply(param)
|
||||
} finally {
|
||||
self.stop()
|
||||
}
|
||||
|
|
@ -1956,8 +1956,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
case (fun: Function[Any, Unit], param: Any) ⇒ try {
|
||||
self.reply(fun(param))
|
||||
case (fun: Function[_, _], param: Any) ⇒ try {
|
||||
self.reply(fun.asInstanceOf[Any => Any](param))
|
||||
} finally {
|
||||
self.stop()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,14 +88,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
lock.readLock.lock
|
||||
try {
|
||||
val c = remoteClients.get(key) match {
|
||||
case s: Some[RemoteClient] ⇒ s.get
|
||||
case Some(client) ⇒ client
|
||||
case None ⇒
|
||||
lock.readLock.unlock
|
||||
lock.writeLock.lock //Lock upgrade, not supported natively
|
||||
try {
|
||||
try {
|
||||
remoteClients.get(key) match { //Recheck for addition, race between upgrades
|
||||
case s: Some[RemoteClient] ⇒ s.get //If already populated by other writer
|
||||
case Some(client) ⇒ client //If already populated by other writer
|
||||
case None ⇒ //Populate map
|
||||
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
|
||||
client.connect()
|
||||
|
|
@ -111,15 +111,15 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
|
||||
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
|
||||
remoteClients.remove(Address(address)) match {
|
||||
case s: Some[RemoteClient] ⇒ s.get.shutdown()
|
||||
case None ⇒ false
|
||||
case Some(client) ⇒ client.shutdown()
|
||||
case None ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
|
||||
remoteClients.get(Address(address)) match {
|
||||
case s: Some[RemoteClient] ⇒ s.get.connect(reconnectIfAlreadyConnected = true)
|
||||
case None ⇒ false
|
||||
case Some(client) ⇒ client.connect(reconnectIfAlreadyConnected = true)
|
||||
case None ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -632,12 +632,12 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule
|
|||
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
|
||||
|
||||
def address = currentServer.get match {
|
||||
case s: Some[NettyRemoteServer] ⇒ s.get.address
|
||||
case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress
|
||||
case Some(server) ⇒ server.address
|
||||
case None ⇒ ReflectiveAccess.RemoteModule.configDefaultAddress
|
||||
}
|
||||
|
||||
def name = currentServer.get match {
|
||||
case s: Some[NettyRemoteServer] ⇒ s.get.name
|
||||
case Some(server) ⇒ server.name
|
||||
case None ⇒
|
||||
val a = ReflectiveAccess.RemoteModule.configDefaultAddress
|
||||
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort
|
||||
|
|
@ -920,15 +920,15 @@ class RemoteServerHandler(
|
|||
request.getActorInfo.getTimeout,
|
||||
new ActorPromise(request.getActorInfo.getTimeout).
|
||||
onComplete(_.value.get match {
|
||||
case l: Left[Throwable, Any] ⇒ write(channel, createErrorReplyMessage(l.a, request))
|
||||
case r: Right[Throwable, Any] ⇒
|
||||
case Left(exception) ⇒ write(channel, createErrorReplyMessage(exception, request))
|
||||
case r: Right[_,_] ⇒
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef),
|
||||
Right(request.getUuid),
|
||||
actorInfo.getAddress,
|
||||
actorInfo.getTimeout,
|
||||
r,
|
||||
true,
|
||||
r.asInstanceOf[Either[Throwable,Any]],
|
||||
isOneWay = true,
|
||||
Some(actorRef))
|
||||
|
||||
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
|
||||
|
|
|
|||
|
|
@ -95,10 +95,10 @@ object ActorSerialization {
|
|||
if (actorRef.mailbox eq null) throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
|
||||
val messages =
|
||||
actorRef.mailbox match {
|
||||
case q: java.util.Queue[MessageInvocation] ⇒
|
||||
case q: java.util.Queue[_] ⇒
|
||||
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
|
||||
val it = q.iterator
|
||||
while (it.hasNext == true) l += it.next
|
||||
while (it.hasNext) l += it.next.asInstanceOf[MessageInvocation]
|
||||
l
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -282,8 +282,8 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
|
|||
val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false)
|
||||
|
||||
def receive = {
|
||||
case update: Update[T] ⇒
|
||||
self.tryReply(atomic(txFactory) { agent.ref alter update.function })
|
||||
case update: Update[_] ⇒
|
||||
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] })
|
||||
case Get ⇒ self reply agent.get
|
||||
case _ ⇒ ()
|
||||
}
|
||||
|
|
@ -298,8 +298,8 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
|
|||
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
|
||||
|
||||
def receive = {
|
||||
case update: Update[T] ⇒ try {
|
||||
self.tryReply(atomic(txFactory) { agent.ref alter update.function })
|
||||
case update: Update[_] ⇒ try {
|
||||
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T => T] })
|
||||
} finally {
|
||||
agent.resume
|
||||
self.stop()
|
||||
|
|
|
|||
|
|
@ -70,12 +70,12 @@ object TestActorRef {
|
|||
|
||||
import ReflectiveAccess.{ createInstance, noParams, noArgs }
|
||||
createInstance[T](manifest[T].erasure, noParams, noArgs) match {
|
||||
case r: Right[_, T] ⇒ r.b
|
||||
case l: Left[Exception, _] ⇒ throw new ActorInitializationException(
|
||||
case Right(value) ⇒ value
|
||||
case Left(exception) ⇒ throw new ActorInitializationException(
|
||||
"Could not instantiate Actor" +
|
||||
"\nMake sure Actor is NOT defined inside a class/trait," +
|
||||
"\nif so put it outside the class/trait, f.e. in a companion object," +
|
||||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", l.a)
|
||||
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'.", exception)
|
||||
}
|
||||
}, address)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue