diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala index ca6e42d67f..ca23dd5a33 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/PerformanceSpec.scala @@ -3,12 +3,11 @@ package akka.performance.workbench import scala.collection.immutable.TreeMap import org.apache.commons.math.stat.descriptive.DescriptiveStatistics import org.scalatest.BeforeAndAfterEach -import akka.actor.simpleName import akka.testkit.AkkaSpec -import akka.actor.ActorSystem import akka.util.Duration import com.typesafe.config.Config import java.util.concurrent.TimeUnit +import akka.event.Logging abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends AkkaSpec(cfg) with BeforeAndAfterEach { @@ -36,7 +35,7 @@ abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends Akk } def logMeasurement(numberOfClients: Int, durationNs: Long, n: Long) { - val name = simpleName(this) + val name = Logging.simpleName(this) val durationS = durationNs.toDouble / 1000000000.0 val stats = Stats( @@ -51,7 +50,7 @@ abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends Akk } def logMeasurement(numberOfClients: Int, durationNs: Long, stat: DescriptiveStatistics) { - val name = simpleName(this) + val name = Logging.simpleName(this) val durationS = durationNs.toDouble / 1000000000.0 val percentiles = TreeMap[Int, Long]( diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 2fd9538d77..47b8bf329c 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -34,8 +34,19 @@ final case class Deploy( routerConfig: RouterConfig = NoRouter, scope: Scope = NoScopeGiven) { + /** + * Java API to create a Deploy with the given RouterConfig + */ def this(routing: RouterConfig) = this("", ConfigFactory.empty, routing) + + /** + * Java API to create a Deploy with the given RouterConfig with Scope + */ def this(routing: RouterConfig, scope: Scope) = this("", ConfigFactory.empty, routing, scope) + + /** + * Java API to create a Deploy with the given Scope + */ def this(scope: Scope) = this("", ConfigFactory.empty, NoRouter, scope) /** @@ -67,13 +78,9 @@ trait Scope { //TODO add @SerialVersionUID(1L) when SI-4804 is fixed abstract class LocalScope extends Scope -case object LocalScope extends LocalScope { - /** - * Java API - */ - @deprecated("use instance() method instead", "2.0.1") - def scope: Scope = this +//FIXME docs +case object LocalScope extends LocalScope { /** * Java API: get the singleton instance */ @@ -162,5 +169,4 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce Some(Deploy(key, deployment, router, NoScopeGiven)) } - } diff --git a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala index 8d3ac68852..72ffbbe76e 100644 --- a/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala +++ b/akka-actor/src/main/scala/akka/actor/DynamicAccess.scala @@ -14,7 +14,7 @@ import java.lang.reflect.InvocationTargetException * This is an internal facility and users are not expected to encounter it * unless they are extending Akka in ways which go beyond simple Extensions. */ -trait DynamicAccess { +abstract class DynamicAccess { /** * Convenience method which given a `Class[_]` object and a constructor description @@ -88,7 +88,7 @@ trait DynamicAccess { * by default. */ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess { - + //FIXME switch to Scala Reflection for 2.10 override def getClassFor[T: ClassManifest](fqcn: String): Either[Throwable, Class[_ <: T]] = try { val c = classLoader.loadClass(fqcn).asInstanceOf[Class[_ <: T]] diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 3d1f8930c4..71d1ec7e69 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -6,9 +6,10 @@ package akka.actor import akka.util._ import scala.collection.mutable -import akka.event.Logging import akka.routing.{ Deafen, Listen, Listeners } +//FIXME: Roland, could you go through this file? + object FSM { object NullFunction extends PartialFunction[Any, Nothing] { diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 70246bab30..383010f9de 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -9,8 +9,13 @@ import scala.collection.JavaConversions._ import java.lang.{ Iterable ⇒ JIterable } import akka.util.Duration +/** + * ChildRestartStats is the statistics kept by every parent Actor for every child Actor + * and is used for SupervisorStrategies to know how to deal with problems that occur for the children. + */ case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { + //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { case (Some(retries), _) if retries < 1 ⇒ false @@ -160,19 +165,19 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { def makeDecider(flat: Iterable[CauseDirective]): Decider = { val directives = sort(flat) - { - case x ⇒ directives find (_._1 isInstance x) map (_._2) getOrElse Escalate - } + { case x ⇒ directives find (_._1 isInstance x) map (_._2) getOrElse Escalate } } - def makeDecider(func: JDecider): Decider = { - case x ⇒ func(x) - } + /** + * Converts a Java Decider into a Scala Decider + */ + def makeDecider(func: JDecider): Decider = { case x ⇒ func(x) } /** * Sort so that subtypes always precede their supertypes, but without * obeying any order between unrelated subtypes (insert sort). */ + //FIXME Should this really be public API? def sort(in: Iterable[CauseDirective]): Seq[CauseDirective] = (new ArrayBuffer[CauseDirective](in.size) /: in) { (buf, ca) ⇒ buf.indexWhere(_._1 isAssignableFrom ca._1) match { @@ -184,14 +189,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { private[akka] def withinTimeRangeOption(withinTimeRange: Duration): Option[Duration] = if (withinTimeRange.isFinite && withinTimeRange >= Duration.Zero) Some(withinTimeRange) else None + private[akka] def maxNrOfRetriesOption(maxNrOfRetries: Int): Option[Int] = if (maxNrOfRetries < 0) None else Some(maxNrOfRetries) } +/** + * An Akka SupervisorStrategy is + */ abstract class SupervisorStrategy { import SupervisorStrategy._ + /** + * Returns the Decider that is associated with this SupervisorStrategy + */ def decider: Decider /** @@ -204,21 +216,19 @@ abstract class SupervisorStrategy { */ def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit - def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { - if (children.nonEmpty) - children.foreach(_.asInstanceOf[InternalActorRef].suspend()) - } + //FIXME docs + def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = + if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].suspend()) - def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { - if (children.nonEmpty) - children.foreach(_.asInstanceOf[InternalActorRef].restart(cause)) - } + //FIXME docs + def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = + if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].restart(cause)) /** * Returns whether it processed the failure or not */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { - val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate + val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10 directive match { case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true @@ -242,6 +252,8 @@ abstract class SupervisorStrategy { case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration = Duration.Inf)(val decider: SupervisorStrategy.Decider) extends SupervisorStrategy { + import SupervisorStrategy._ + def this(maxNrOfRetries: Int, withinTimeRange: Duration, decider: SupervisorStrategy.JDecider) = this(maxNrOfRetries, withinTimeRange)(SupervisorStrategy.makeDecider(decider)) @@ -256,9 +268,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration * every call to requestRestartPermission, assuming that strategies are shared * across actors and thus this field does not take up much space */ - private val retriesWindow = ( - SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries), - SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt)) + private val retriesWindow = (maxNrOfRetriesOption(maxNrOfRetries), withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt)) def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 72eaf32e83..3ff91c4fa8 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -30,7 +30,7 @@ import java.util.UUID */ object IO { - final class DivergentIterateeException extends Exception("Iteratees should not return a continuation when receiving EOF") + final class DivergentIterateeException extends IllegalStateException("Iteratees should not return a continuation when receiving EOF") /** * An immutable handle to a Java NIO Channel. Contains a reference to the @@ -64,14 +64,14 @@ object IO { * A [[akka.actor.IO.Handle]] to a ReadableByteChannel. */ sealed trait ReadHandle extends Handle with Product { - override def asReadable = this + override def asReadable: ReadHandle = this } /** * A [[akka.actor.IO.Handle]] to a WritableByteChannel. */ sealed trait WriteHandle extends Handle with Product { - override def asWritable = this + override def asWritable: WriteHandle = this /** * Sends a request to the [[akka.actor.IOManager]] to write to the @@ -89,7 +89,7 @@ object IO { * [[akka.actor.IO.ServerHandle]].accept(). */ case class SocketHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = UUID.randomUUID()) extends ReadHandle with WriteHandle { - override def asSocket = this + override def asSocket: SocketHandle = this } /** @@ -97,7 +97,7 @@ object IO { * normally created by [[akka.actor.IOManager]].listen(). */ case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = UUID.randomUUID()) extends Handle { - override def asServer = this + override def asServer: ServerHandle = this /** * Sends a request to the [[akka.actor.IOManager]] to accept an incoming @@ -320,16 +320,18 @@ object IO { } object Chunk { - val empty = Chunk(ByteString.empty) + val empty = new Chunk(ByteString.empty) } /** * Part of an [[akka.actor.IO.Input]] stream that contains a chunk of bytes. */ case class Chunk(bytes: ByteString) extends Input { - def ++(that: Input) = that match { - case Chunk(more) ⇒ Chunk(bytes ++ more) - case _: EOF ⇒ that + final override def ++(that: Input): Input = that match { + case Chunk(more) if more.isEmpty ⇒ this + case c: Chunk if bytes.isEmpty ⇒ c + case Chunk(more) ⇒ Chunk(bytes ++ more) + case _: EOF ⇒ that } } @@ -342,7 +344,7 @@ object IO { * Iteratee.recover() in order to handle it properly. */ case class EOF(cause: Option[Exception]) extends Input { - def ++(that: Input) = that + final override def ++(that: Input) = that } object Iteratee { @@ -352,7 +354,15 @@ object IO { * inferred as an Iteratee and not as a Done. */ def apply[A](value: A): Iteratee[A] = Done(value) + + /** + * Returns Iteratee.unit + */ def apply(): Iteratee[Unit] = unit + + /** + * The single value representing Done(()) + */ val unit: Iteratee[Unit] = Done(()) } @@ -445,6 +455,7 @@ object IO { */ final case class Cont[+A](f: Input ⇒ (Iteratee[A], Input), error: Option[Exception] = None) extends Iteratee[A] + //FIXME general description of what an IterateeRef is and how it is used, potentially with link to docs object IterateeRef { /** @@ -477,13 +488,14 @@ object IO { * 'refFactory' is used to provide the default value for new keys. */ class Map[K, V] private (refFactory: ⇒ IterateeRef[V], underlying: mutable.Map[K, IterateeRef[V]] = mutable.Map.empty[K, IterateeRef[V]]) extends mutable.Map[K, IterateeRef[V]] { - def get(key: K) = Some(underlying.getOrElseUpdate(key, refFactory)) - def iterator = underlying.iterator - def +=(kv: (K, IterateeRef[V])) = { underlying += kv; this } - def -=(key: K) = { underlying -= key; this } + override def get(key: K) = Some(underlying.getOrElseUpdate(key, refFactory)) + override def iterator = underlying.iterator + override def +=(kv: (K, IterateeRef[V])) = { underlying += kv; this } + override def -=(key: K) = { underlying -= key; this } override def empty = new Map[K, V](refFactory) } + //FIXME general description of what an Map is and how it is used, potentially with link to docs object Map { /** * Uses a factory to create the initial IterateeRef for each new key. @@ -500,7 +512,6 @@ object IO { */ def async[K]()(implicit executor: ExecutionContext): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async()) } - } /** @@ -510,8 +521,11 @@ object IO { * for details. */ trait IterateeRef[A] { + //FIXME Add docs def flatMap(f: A ⇒ Iteratee[A]): Unit + //FIXME Add docs def map(f: A ⇒ A): Unit + //FIXME Add docs def apply(input: Input): Unit } @@ -528,12 +542,16 @@ object IO { */ final class IterateeRefSync[A](initial: Iteratee[A]) extends IterateeRef[A] { private var _value: (Iteratee[A], Input) = (initial, Chunk.empty) - def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value match { + override def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value match { case (iter, chunk @ Chunk(bytes)) if bytes.nonEmpty ⇒ (iter flatMap f)(chunk) case (iter, input) ⇒ (iter flatMap f, input) } - def map(f: A ⇒ A): Unit = _value = (_value._1 map f, _value._2) - def apply(input: Input): Unit = _value = _value._1(_value._2 ++ input) + override def map(f: A ⇒ A): Unit = _value = (_value._1 map f, _value._2) + override def apply(input: Input): Unit = _value = _value._1(_value._2 ++ input) + + /** + * Returns the current value of this IterateeRefSync + */ def value: (Iteratee[A], Input) = _value } @@ -553,12 +571,16 @@ object IO { */ final class IterateeRefAsync[A](initial: Iteratee[A])(implicit executor: ExecutionContext) extends IterateeRef[A] { private var _value: Future[(Iteratee[A], Input)] = Future((initial, Chunk.empty)) - def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value map { + override def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value map { case (iter, chunk @ Chunk(bytes)) if bytes.nonEmpty ⇒ (iter flatMap f)(chunk) case (iter, input) ⇒ (iter flatMap f, input) } - def map(f: A ⇒ A): Unit = _value = _value map (v ⇒ (v._1 map f, v._2)) - def apply(input: Input): Unit = _value = _value map (v ⇒ v._1(v._2 ++ input)) + override def map(f: A ⇒ A): Unit = _value = _value map (v ⇒ (v._1 map f, v._2)) + override def apply(input: Input): Unit = _value = _value map (v ⇒ v._1(v._2 ++ input)) + + /** + * Returns a Future which will hold the future value of this IterateeRefAsync + */ def future: Future[(Iteratee[A], Input)] = _value } @@ -702,10 +724,9 @@ object IO { /** * An Iteratee that continually repeats an Iteratee. * - * TODO: Should terminate on EOF + * FIXME TODO: Should terminate on EOF */ - def repeat(iter: Iteratee[Unit]): Iteratee[Unit] = - iter flatMap (_ ⇒ repeat(iter)) + def repeat(iter: Iteratee[Unit]): Iteratee[Unit] = iter flatMap (_ ⇒ repeat(iter)) /** * An Iteratee that applies an Iteratee to each element of a Traversable @@ -780,7 +801,7 @@ object IO { * An IOManager does not need to be manually stopped when not in use as it will * automatically enter an idle state when it has no channels to manage. */ -final class IOManager private (system: ActorSystem) extends Extension { +final class IOManager private (system: ActorSystem) extends Extension { //FIXME how about taking an ActorContext /** * A reference to the [[akka.actor.IOManagerActor]] that performs the actual * IO. It communicates with other actors using subclasses of @@ -861,9 +882,10 @@ final class IOManager private (system: ActorSystem) extends Extension { } +//FIXME add docs object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { - override def lookup = this - override def createExtension(system: ExtendedActorSystem) = new IOManager(system) + override def lookup: IOManager.type = this + override def createExtension(system: ExtendedActorSystem): IOManager = new IOManager(system) } /** @@ -874,7 +896,7 @@ object IOManager extends ExtensionId[IOManager] with ExtensionIdProvider { final class IOManagerActor extends Actor with ActorLogging { import SelectionKey.{ OP_READ, OP_WRITE, OP_ACCEPT, OP_CONNECT } - private val bufferSize = 8192 // TODO: make buffer size configurable + private val bufferSize = 8192 // FIXME TODO: make configurable private type ReadChannel = ReadableByteChannel with SelectableChannel private type WriteChannel = WritableByteChannel with SelectableChannel @@ -897,7 +919,7 @@ final class IOManagerActor extends Actor with ActorLogging { private var lastSelect = 0 /** force a select when lastSelect reaches this amount */ - private val selectAt = 100 + private val selectAt = 100 // FIXME TODO: make configurable /** true while the selector is open and channels.nonEmpty */ private var running = false @@ -947,9 +969,7 @@ final class IOManagerActor extends Actor with ActorLogging { lastSelect = 0 } - private def forwardFailure(f: ⇒ Unit): Unit = { - try { f } catch { case NonFatal(e) ⇒ sender ! Status.Failure(e) } - } + private def forwardFailure(f: ⇒ Unit): Unit = try f catch { case NonFatal(e) ⇒ sender ! Status.Failure(e) } private def setSocketOptions(socket: java.net.Socket, options: Seq[IO.SocketOption]) { options foreach { @@ -985,7 +1005,7 @@ final class IOManagerActor extends Actor with ActorLogging { forwardFailure(sock.setPerformancePreferences(connTime, latency, bandwidth)) } - channel.socket bind (address, 1000) // TODO: make backlog configurable + channel.socket bind (address, 1000) // FIXME TODO: make backlog configurable channels update (server, channel) channel register (selector, OP_ACCEPT, server) server.owner ! IO.Listening(server, channel.socket.getLocalSocketAddress()) @@ -1048,29 +1068,13 @@ final class IOManagerActor extends Actor with ActorLogging { private def process(key: SelectionKey) { val handle = key.attachment.asInstanceOf[IO.Handle] try { - if (key.isConnectable) key.channel match { - case channel: SocketChannel ⇒ connect(handle.asSocket, channel) - } - if (key.isAcceptable) key.channel match { - case channel: ServerSocketChannel ⇒ accept(handle.asServer, channel) - } - if (key.isReadable) key.channel match { - case channel: ReadChannel ⇒ read(handle.asReadable, channel) - } - if (key.isWritable) key.channel match { - case channel: WriteChannel ⇒ - try { - write(handle.asWritable, channel) - } catch { - case e: IOException ⇒ - // ignore, let it fail on read to ensure nothing left in read buffer. - } - } + if (key.isConnectable) key.channel match { case channel: SocketChannel ⇒ connect(handle.asSocket, channel) } + if (key.isAcceptable) key.channel match { case channel: ServerSocketChannel ⇒ accept(handle.asServer, channel) } + if (key.isReadable) key.channel match { case channel: ReadChannel ⇒ read(handle.asReadable, channel) } + if (key.isWritable) key.channel match { case channel: WriteChannel ⇒ try write(handle.asWritable, channel) catch { case e: IOException ⇒ } } // ignore, let it fail on read to ensure nothing left in read buffer. } catch { - case e: ClassCastException ⇒ cleanup(handle, Some(e)) - case e: CancelledKeyException ⇒ cleanup(handle, Some(e)) - case e: IOException ⇒ cleanup(handle, Some(e)) - case e: ActorInitializationException ⇒ cleanup(handle, Some(e)) + case e @ (_: ClassCastException | _: CancelledKeyException | _: IOException | _: ActorInitializationException) ⇒ + cleanup(handle, Some(e.asInstanceOf[Exception])) //Scala patmat is broken } } @@ -1089,9 +1093,6 @@ final class IOManagerActor extends Actor with ActorLogging { } } - private def setOps(handle: IO.Handle, ops: Int): Unit = - channels(handle) keyFor selector interestOps ops - private def addOps(handle: IO.Handle, ops: Int) { val key = channels(handle) keyFor selector val cur = key.interestOps @@ -1157,9 +1158,9 @@ final class IOManagerActor extends Actor with ActorLogging { } } } - } +//FIXME is this public API? final class WriteBuffer(bufferSize: Int) { private val _queue = new java.util.ArrayDeque[ByteString] private val _buffer = ByteBuffer.allocate(bufferSize) @@ -1181,9 +1182,9 @@ final class WriteBuffer(bufferSize: Int) { this } - def length = _length + def length: Int = _length - def isEmpty = _length == 0 + def isEmpty: Boolean = _length == 0 def write(channel: WritableByteChannel with SelectableChannel): Int = { @tailrec diff --git a/akka-actor/src/main/scala/akka/actor/package.scala b/akka-actor/src/main/scala/akka/actor/package.scala index 617e3fee5c..3bf56b8bc4 100644 --- a/akka-actor/src/main/scala/akka/actor/package.scala +++ b/akka-actor/src/main/scala/akka/actor/package.scala @@ -7,12 +7,4 @@ package akka package object actor { implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = ref.asInstanceOf[ScalaActorRef] implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = ref.asInstanceOf[ActorRef] - - def simpleName(obj: AnyRef): String = simpleName(obj.getClass) - - def simpleName(clazz: Class[_]): String = { - val n = clazz.getName - val i = n.lastIndexOf('.') - n.substring(i + 1) - } } diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index 27f0c71515..172cf052ca 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -3,7 +3,8 @@ */ package akka.event -import akka.actor.{ ActorRef, ActorSystem, simpleName } +import akka.actor.{ ActorRef, ActorSystem } +import akka.event.Logging.simpleName import akka.util.Subclassification object EventStream { diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index bf4fc7996d..2cda6469da 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -275,8 +275,8 @@ object LogSource { // this one unfortunately does not work as implicit, because existential types have some weird behavior val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { - def genString(c: Class[_]) = simpleName(c) - override def genString(c: Class[_], system: ActorSystem) = simpleName(c) + "(" + system + ")" + def genString(c: Class[_]) = Logging.simpleName(c) + override def genString(c: Class[_], system: ActorSystem) = genString(c) + "(" + system + ")" override def getClazz(c: Class[_]) = c } implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]] @@ -310,7 +310,7 @@ object LogSource { case a: Actor ⇒ apply(a) case a: ActorRef ⇒ apply(a) case s: String ⇒ apply(s) - case x ⇒ (simpleName(x), x.getClass) + case x ⇒ (Logging.simpleName(x), x.getClass) } /** @@ -324,7 +324,7 @@ object LogSource { case a: Actor ⇒ apply(a) case a: ActorRef ⇒ apply(a) case s: String ⇒ apply(s) - case x ⇒ (simpleName(x) + "(" + system + ")", x.getClass) + case x ⇒ (Logging.simpleName(x) + "(" + system + ")", x.getClass) } } @@ -363,6 +363,14 @@ object LogSource { */ object Logging { + def simpleName(obj: AnyRef): String = simpleName(obj.getClass) + + def simpleName(clazz: Class[_]): String = { + val n = clazz.getName + val i = n.lastIndexOf('.') + n.substring(i + 1) + } + object Extension extends ExtensionKey[LogExt] class LogExt(system: ExtendedActorSystem) extends Extension { diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 7baf3011ee..84c5764cf5 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -3,27 +3,22 @@ */ package akka.remote.netty -import java.net.InetSocketAddress -import org.jboss.netty.util.HashedWheelTimer +import java.util.concurrent.TimeUnit +import java.net.{ InetAddress, InetSocketAddress } +import org.jboss.netty.util.{ Timeout, TimerTask, HashedWheelTimer } import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel.group.DefaultChannelGroup -import org.jboss.netty.channel.{ ChannelHandler, StaticChannelPipeline, SimpleChannelUpstreamHandler, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel } +import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, StaticChannelPipeline, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel } import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } import org.jboss.netty.handler.execution.ExecutionHandler +import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler } + import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } -import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected } -import akka.actor.{ simpleName, Address } +import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected, RemoteClientWriteFailed } +import akka.actor.{ Address, ActorRef } import akka.AkkaException import akka.event.Logging import akka.util.Switch -import akka.actor.ActorRef -import org.jboss.netty.channel.ChannelFutureListener -import akka.remote.RemoteClientWriteFailed -import java.net.InetAddress -import org.jboss.netty.util.TimerTask -import org.jboss.netty.util.Timeout -import java.util.concurrent.TimeUnit -import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler } class RemoteClientMessageBufferException(message: String, cause: Throwable) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null) @@ -40,7 +35,7 @@ abstract class RemoteClient private[akka] ( val log = Logging(netty.system, "RemoteClient") - val name = simpleName(this) + "@" + remoteAddress + val name = Logging.simpleName(this) + "@" + remoteAddress private[remote] val runSwitch = new Switch()