From 603a3d8187778231b5de612f74c6818580d0baf2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Aug 2010 16:28:08 +0200 Subject: [PATCH 01/17] Document and remove dead code, restructure tests --- .../scala/dataflow/DataFlowVariable.scala | 86 +++++--------- .../test/scala/dataflow/DataFlowSpec.scala | 110 +----------------- 2 files changed, 33 insertions(+), 163 deletions(-) diff --git a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala index fa4325e4c2..c6eff59495 100644 --- a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala @@ -21,12 +21,10 @@ object DataFlow { object Start object Exit - import java.util.concurrent.atomic.AtomicReference - import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue} - import scala.collection.JavaConversions._ - import se.scalablesolutions.akka.actor.Actor - import se.scalablesolutions.akka.dispatch.CompletableFuture + class DataFlowVariableException(msg: String) extends AkkaException(msg) + /** Executes the supplied thunk in another thread + */ def thread(body: => Unit): Unit = spawn(body) def thread[A <: AnyRef, R <: AnyRef](body: A => R) = @@ -40,21 +38,25 @@ object DataFlow { } } - /** - * @author Jonas Bonér - */ - sealed class DataFlowVariable[T <: Any] { - val TIME_OUT = 1000 * 60 // 60 seconds default timeout - + private object DataFlowVariable { private sealed abstract class DataFlowVariableMessage private case class Set[T <: Any](value: T) extends DataFlowVariableMessage private object Get extends DataFlowVariableMessage + } + + /** + * @author Jonas Bonér + */ + sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { + import DataFlowVariable._ + + def this() = this(1000 * 60) private val value = new AtomicReference[Option[T]](None) private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = TIME_OUT + self.timeout = timeoutMs def receive = { case s@Set(v) => if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { @@ -67,7 +69,7 @@ object DataFlow { } private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = TIME_OUT + self.timeout = timeoutMs private var readerFuture: Option[CompletableFuture[Any]] = None def receive = { case Get => dataFlow.value.get match { @@ -81,63 +83,29 @@ object DataFlow { private[this] val in = actorOf(new In(this)).start - def <<(ref: DataFlowVariable[T]): Unit = if(this.value.get.isEmpty) in ! Set(ref()) + /** Sets the value of this variable (if unset) with the value of the supplied variable + */ + def <<(ref: DataFlowVariable[T]): Unit = + if(this.value.get.isEmpty) in ! Set(ref()) - def <<(value: T): Unit = if(this.value.get.isEmpty) in ! Set(value) + /** Sets the value of this variable (if unset) + */ + def <<(value: T): Unit = + if(this.value.get.isEmpty) in ! Set(value) + /** Retrieves the value of variable + * throws a DataFlowVariableException if it times out + */ def apply(): T = { value.get getOrElse { val out = actorOf(new Out(this)).start blockedReaders offer out val result = (out !! Get).as[T] out ! Exit - result.getOrElse(throw new DataFlowVariableException("Timed out (after " + TIME_OUT + " milliseconds) while waiting for result")) + result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) } } def shutdown = in ! Exit } - - /** - * @author Jonas Bonér - */ - /*FIXME I do not work - class DataFlowStream[T <: Any] extends Seq[T] { - private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] - - def <<<(ref: DataFlowVariable[T]): Boolean = queue offer ref - - def <<<(value: T): Boolean = { - val ref = new DataFlowVariable[T] - ref << value - queue offer ref - } - - def apply(): T = queue.take.apply - - def take: DataFlowVariable[T] = queue.take - - //==== For Seq ==== - - def length: Int = queue.size - - def apply(i: Int): T = { - if (i == 0) apply() - else throw new UnsupportedOperationException( - "Access by index other than '0' is not supported by DataFlowStream") - } - - def iterator: Iterator[T] = new Iterator[T] { - private val i = queue.iterator - def hasNext: Boolean = i.hasNext - def next: T = { val ref = i.next; ref() } - } - - override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] - }*/ - - /** - * @author Jonas Bonér - */ - class DataFlowVariableException(msg: String) extends AkkaException(msg) } \ No newline at end of file diff --git a/akka-core/src/test/scala/dataflow/DataFlowSpec.scala b/akka-core/src/test/scala/dataflow/DataFlowSpec.scala index 0e917c904e..cdc8bf2455 100644 --- a/akka-core/src/test/scala/dataflow/DataFlowSpec.scala +++ b/akka-core/src/test/scala/dataflow/DataFlowSpec.scala @@ -20,7 +20,7 @@ import se.scalablesolutions.akka.actor.ActorRegistry @RunWith(classOf[JUnitRunner]) class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { describe("DataflowVariable") { - it("should work and generate correct results") { + it("should be able to set the value of one variable from other variables") { import DataFlow._ val latch = new CountDownLatch(1) @@ -34,13 +34,12 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { thread { x << 40 } thread { y << 2 } - latch.await(3,TimeUnit.SECONDS) should equal (true) - List(x,y,z).foreach(_.shutdown) + latch.await(10,TimeUnit.SECONDS) should equal (true) result.get should equal (42) - ActorRegistry.shutdownAll + List(x,y,z).foreach(_.shutdown) } - it("should be able to transform a stream") { + it("should be able to sum a sequence of ints") { import DataFlow._ def ints(n: Int, max: Int): List[Int] = @@ -66,106 +65,9 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { latch.countDown } - latch.await(3,TimeUnit.SECONDS) should equal (true) - List(x,y,z).foreach(_.shutdown) + latch.await(10,TimeUnit.SECONDS) should equal (true) result.get should equal (sum(0,ints(0,1000))) - ActorRegistry.shutdownAll + List(x,y,z).foreach(_.shutdown) } } - - /*it("should be able to join streams") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - - thread { ints(0, 1000, producer) } - thread { - Thread.sleep(1000) - result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) - latch.countDown - } - - latch.await(3,TimeUnit.SECONDS) should equal (true) - result.get should equal (332833500) - ActorRegistry.shutdownAll - } - - it("should be able to sum streams recursively") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val result = new AtomicLong(0) - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - - @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { - val x = stream() - - if(result.addAndGet(x) == 166666500) - latch.countDown - - recurseSum(stream) - } - - thread { ints(0, 1000, producer) } - thread { sum(0, producer, consumer) } - thread { recurseSum(consumer) } - - latch.await(15,TimeUnit.SECONDS) should equal (true) - ActorRegistry.shutdownAll - }*/ - - /* Test not ready for prime time, causes some sort of deadlock */ - /* it("should be able to conditionally set variables") { - - import DataFlow._ - - val latch = new CountDownLatch(1) - val x, y, z, v = new DataFlowVariable[Int] - - val main = thread { - x << 1 - z << Math.max(x(),y()) - latch.countDown - } - - val setY = thread { - Thread sleep 2000 - y << 2 - } - - val setV = thread { - v << y - } - - latch.await(2,TimeUnit.SECONDS) should equal (true) - List(x,y,z,v) foreach (_.shutdown) - List(main,setY,setV) foreach (_ ! Exit) - println("Foo") - ActorRegistry.shutdownAll - }*/ } From 30b3627899ad3364a723e8530f71e3f347f41b3d Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Tue, 24 Aug 2010 10:54:53 +1200 Subject: [PATCH 02/17] Update sbt plugin --- .../src/main/scala/AkkaProject.scala | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala index ecf19ae135..10b69722e0 100644 --- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala +++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala @@ -1,12 +1,13 @@ import sbt._ object AkkaRepositories { - val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") - val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") - val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") - val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") - val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org") + val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") + val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") + val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") + val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") + val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") + val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") + val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") } trait AkkaBaseProject extends BasicScalaProject { @@ -17,27 +18,30 @@ trait AkkaBaseProject extends BasicScalaProject { // for development version resolve to .ivy2/local // val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo) - val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo) - val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo) - val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo) - val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", AkkaRepo) + + val aspectwerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", AkkaRepo) + val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo) val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo) val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo) + val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo) + val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo) + val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo) val sjsonModuleConfig = ModuleConfiguration("sjson.json", AkkaRepo) val voldemortModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo) - val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo) + val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo) + + val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) + val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) - val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) - val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) - val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) + val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) - val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) - val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo) - val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) // only while snapshot version - val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots) + val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) + val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) + val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo) + val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) } trait AkkaProject extends AkkaBaseProject { From 4b7bf0ded0097decd6897a2c82b2eea6ee65eab9 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 Aug 2010 13:11:41 +0200 Subject: [PATCH 03/17] Optimization of DataFlow + bugfix --- .../scala/dataflow/DataFlowVariable.scala | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala index c6eff59495..787793dc5f 100644 --- a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala @@ -85,13 +85,19 @@ object DataFlow { /** Sets the value of this variable (if unset) with the value of the supplied variable */ - def <<(ref: DataFlowVariable[T]): Unit = - if(this.value.get.isEmpty) in ! Set(ref()) + def <<(ref: DataFlowVariable[T]) { + if (this.value.get.isEmpty) in ! Set(ref()) + else throw new DataFlowVariableException( + "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") + } /** Sets the value of this variable (if unset) */ - def <<(value: T): Unit = - if(this.value.get.isEmpty) in ! Set(value) + def <<(value: T) { + if (this.value.get.isEmpty) in ! Set(value) + else throw new DataFlowVariableException( + "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") + } /** Retrieves the value of variable * throws a DataFlowVariableException if it times out @@ -99,9 +105,16 @@ object DataFlow { def apply(): T = { value.get getOrElse { val out = actorOf(new Out(this)).start - blockedReaders offer out - val result = (out !! Get).as[T] - out ! Exit + + val result = try { + blockedReaders offer out + (out !! Get).as[T] + } catch { + case e: Exception => + out ! Exit + throw e + } + result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) } } From 57cdccd349ef2bc985ed8105f5e24dca46b68d53 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 Aug 2010 13:11:56 +0200 Subject: [PATCH 04/17] Reconnect now possible in RemoteClient --- .../src/main/scala/remote/RemoteClient.scala | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 6a8245e8b8..e20de6b9d0 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -181,41 +181,43 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade extends Logging with ListenerManagement { val name = "RemoteClient@" + hostname + "::" + port - @volatile private[remote] var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] private val supervisors = new ConcurrentHashMap[String, ActorRef] - private val channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool, - Executors.newCachedThreadPool) - - private val bootstrap = new ClientBootstrap(channelFactory) - private val timer = new HashedWheelTimer private val remoteAddress = new InetSocketAddress(hostname, port) - private[remote] var connection: ChannelFuture = _ - private[remote] val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName); + @volatile private[remote] var isRunning = false + @volatile private var bootstrap: ClientBootstrap = _ + @volatile private var reconnectionTimeWindowStart = 0L + @volatile private[remote] var connection: ChannelFuture = _ + @volatile private[remote] var openChannels: DefaultChannelGroup = _ + @volatile private var timer: HashedWheelTimer = _ private val reconnectionTimeWindow = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis - @volatile private var reconnectionTimeWindowStart = 0L - - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) def connect = synchronized { if (!isRunning) { + openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName) + timer = new HashedWheelTimer + bootstrap = new ClientBootstrap( + new NioClientSocketChannelFactory( + Executors.newCachedThreadPool,Executors.newCachedThreadPool + ) + ) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this)) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) connection = bootstrap.connect(remoteAddress) log.info("Starting remote client connection to [%s:%s]", hostname, port) // Wait until the connection attempt succeeds or fails. val channel = connection.awaitUninterruptibly.getChannel openChannels.add(channel) if (!connection.isSuccess) { - foreachListener(l => l ! RemoteClientError(connection.getCause, this)) + foreachListener(_ ! RemoteClientError(connection.getCause, this)) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) } - foreachListener(l => l ! RemoteClientStarted(this)) + foreachListener(_ ! RemoteClientStarted(this)) isRunning = true } } @@ -224,10 +226,14 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade log.info("Shutting down %s", name) if (isRunning) { isRunning = false - foreachListener(l => l ! RemoteClientShutdown(this)) + foreachListener(_ ! RemoteClientShutdown(this)) timer.stop + timer = null openChannels.close.awaitUninterruptibly + openChannels = null bootstrap.releaseExternalResources + bootstrap = null + connection = null log.info("%s has been shut down", name) } } @@ -257,7 +263,7 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade } } else { val exception = new RemoteClientException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this) - foreachListener(l => l ! RemoteClientError(exception, this)) + foreachListener(_ ! RemoteClientError(exception, this)) throw exception } @@ -373,12 +379,12 @@ class RemoteClientHandler( futures.remove(reply.getId) } else { val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client) - client.foreachListener(l => l ! RemoteClientError(exception, client)) + client.foreachListener(_ ! RemoteClientError(exception, client)) throw exception } } catch { case e: Exception => - client.foreachListener(l => l ! RemoteClientError(e, client)) + client.foreachListener(_ ! RemoteClientError(e, client)) log.error("Unexpected exception in remote client handler: %s", e) throw e } @@ -393,8 +399,7 @@ class RemoteClientHandler( client.connection = bootstrap.connect(remoteAddress) client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails. if (!client.connection.isSuccess) { - client.foreachListener(l => - l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client)) + client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client)) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } @@ -404,7 +409,7 @@ class RemoteClientHandler( override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { def connect = { - client.foreachListener(l => l ! RemoteClientConnected(client)) + client.foreachListener(_ ! RemoteClientConnected(client)) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) client.resetReconnectionTimeWindow } @@ -421,12 +426,12 @@ class RemoteClientHandler( } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.foreachListener(l => l ! RemoteClientDisconnected(client)) + client.foreachListener(_ ! RemoteClientDisconnected(client)) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.foreachListener(l => l ! RemoteClientError(event.getCause, client)) + client.foreachListener(_ ! RemoteClientError(event.getCause, client)) log.error(event.getCause, "Unexpected exception from downstream in remote client") event.getChannel.close } From 43024263c9ae7043134a3e556fb683487d635c58 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 Aug 2010 13:15:58 +0200 Subject: [PATCH 05/17] Adding some comments for the future --- akka-core/src/main/scala/remote/RemoteClient.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index e20de6b9d0..47d9aa357a 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -181,11 +181,13 @@ class RemoteClient private[akka] (val hostname: String, val port: Int, val loade extends Logging with ListenerManagement { val name = "RemoteClient@" + hostname + "::" + port + //FIXME Should these be clear:ed on shutdown? private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]] private val supervisors = new ConcurrentHashMap[String, ActorRef] private val remoteAddress = new InetSocketAddress(hostname, port) + //FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation) @volatile private[remote] var isRunning = false @volatile private var bootstrap: ClientBootstrap = _ @volatile private var reconnectionTimeWindowStart = 0L From 6de4697feacfc010f2358ff5507a77cfe7d3d562 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Aug 2010 15:40:38 +0200 Subject: [PATCH 06/17] Small refactor --- akka-core/src/main/scala/util/AkkaException.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/akka-core/src/main/scala/util/AkkaException.scala b/akka-core/src/main/scala/util/AkkaException.scala index 1dc652c2d5..3e28c17390 100644 --- a/akka-core/src/main/scala/util/AkkaException.scala +++ b/akka-core/src/main/scala/util/AkkaException.scala @@ -20,13 +20,12 @@ import java.net.{InetAddress, UnknownHostException} * @author Jonas Bonér */ @serializable abstract class AkkaException(message: String) extends RuntimeException(message) { - @volatile private var isLogged = false + import AkkaException._ val exceptionName = getClass.getName - val uuid = String.format("%s_%s", AkkaException.hostname, UUID.newUuid.toString) + val uuid = "%s_%s".format(hostname, UUID.newUuid.toString) - override val toString = - String.format("%s\n\t[%s]\n\t%s\n\t%s", exceptionName, uuid, message, stackTrace) + override val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, stackTrace) val stackTrace = { val sw = new StringWriter @@ -35,10 +34,12 @@ import java.net.{InetAddress, UnknownHostException} sw.toString } - def log = if (!isLogged) { - isLogged = true + private lazy val _log = { AkkaException.log.error(toString) + () } + + def log: Unit = _log } object AkkaException extends Logging { From 957e184af672d723c5f9f749c34a7ce04b56a428 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Aug 2010 15:40:57 +0200 Subject: [PATCH 07/17] Adding BoundedTransferQueue --- .../src/main/scala/dispatch/Queues.scala | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 akka-core/src/main/scala/dispatch/Queues.scala diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala new file mode 100644 index 0000000000..5a5d40efbc --- /dev/null +++ b/akka-core/src/main/scala/dispatch/Queues.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.dispatch + +import concurrent.forkjoin.LinkedTransferQueue +import java.util.concurrent.{TimeUnit, Semaphore} +import java.util.Iterator +import se.scalablesolutions.akka.util.Logger + +class BoundedTransferQueue[E <: AnyRef]( + val capacity: Int, + val defaultTimeout: Long, + val defaultTimeUnit: TimeUnit) + extends LinkedTransferQueue[E] { + + protected val guard = new Semaphore(capacity) + + protected def enq( + f: => Boolean, + timeout: Long = defaultTimeout, + unit: TimeUnit = defaultTimeUnit): Boolean = { + if (guard.tryAcquire(timeout,unit)) { + val result = try { + f + } catch { + case e => + guard.release //If something broke, release + throw e + } + if (!result) guard.release //Didn't add anything + result + } else + false + } + + protected def deq(f: => E): E = { + val result: E = f + if (result ne null) guard.release + result + } + + override def take(): E = deq(super.take) + override def poll(): E = deq(super.poll) + override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit)) + + override def remainingCapacity = guard.availablePermits + override def remove(o: AnyRef): Boolean = deq({ + (if (o.isInstanceOf[E] && super.remove(o)) o else null).asInstanceOf[E] + }) ne null + + override def offer(e: E): Boolean = + enq(super.offer(e)) + + override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = + enq(super.offer(e,timeout,unit), timeout, unit) + + override def add(e: E): Boolean = + enq(super.add(e)) + + override def put(e :E): Unit = + enq({ super.put(e); true }) + + override def tryTransfer(e: E): Boolean = + enq(super.tryTransfer(e)) + + override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = + enq(super.tryTransfer(e,timeout,unit),timeout,unit) + + override def transfer(e: E): Unit = + enq({ super.transfer(e); true }) + + override def iterator: Iterator[E] = { + val it = super.iterator + new Iterator[E] { + def hasNext = it.hasNext + def next = it.next + def remove { + it.remove + guard.release + } + } + } +} \ No newline at end of file From c288afa8b6d44cf7e4f161652e5fc183cdee61c8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Aug 2010 16:03:55 +0200 Subject: [PATCH 08/17] Refining names --- akka-core/src/main/scala/dispatch/Queues.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala index 5a5d40efbc..ddb556904c 100644 --- a/akka-core/src/main/scala/dispatch/Queues.scala +++ b/akka-core/src/main/scala/dispatch/Queues.scala @@ -11,17 +11,14 @@ import se.scalablesolutions.akka.util.Logger class BoundedTransferQueue[E <: AnyRef]( val capacity: Int, - val defaultTimeout: Long, - val defaultTimeUnit: TimeUnit) + val pushTimeout: Long, + val pushTimeUnit: TimeUnit) extends LinkedTransferQueue[E] { protected val guard = new Semaphore(capacity) - protected def enq( - f: => Boolean, - timeout: Long = defaultTimeout, - unit: TimeUnit = defaultTimeUnit): Boolean = { - if (guard.tryAcquire(timeout,unit)) { + protected def enq(f: => Boolean): Boolean = { + if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { val result = try { f } catch { @@ -54,7 +51,7 @@ class BoundedTransferQueue[E <: AnyRef]( enq(super.offer(e)) override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = - enq(super.offer(e,timeout,unit), timeout, unit) + enq(super.offer(e,timeout,unit)) override def add(e: E): Boolean = enq(super.add(e)) @@ -66,7 +63,7 @@ class BoundedTransferQueue[E <: AnyRef]( enq(super.tryTransfer(e)) override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = - enq(super.tryTransfer(e,timeout,unit),timeout,unit) + enq(super.tryTransfer(e,timeout,unit)) override def transfer(e: E): Unit = enq({ super.transfer(e); true }) @@ -78,7 +75,7 @@ class BoundedTransferQueue[E <: AnyRef]( def next = it.next def remove { it.remove - guard.release + guard.release //Assume remove worked if no exception was thrown } } } From 5d85d874f44175da6762fa77e15577881bd8ab7f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 25 Aug 2010 16:05:57 +0200 Subject: [PATCH 09/17] Constraining input --- akka-core/src/main/scala/dispatch/Queues.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala index ddb556904c..e12f22a526 100644 --- a/akka-core/src/main/scala/dispatch/Queues.scala +++ b/akka-core/src/main/scala/dispatch/Queues.scala @@ -14,6 +14,9 @@ class BoundedTransferQueue[E <: AnyRef]( val pushTimeout: Long, val pushTimeUnit: TimeUnit) extends LinkedTransferQueue[E] { + require(capacity > 0) + require(pushTimeout > 0) + require(pushTimeUnit ne null) protected val guard = new Semaphore(capacity) From 48dca7926768cdbc2e18794e71b996f24feb0ad8 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Aug 2010 08:50:31 +0200 Subject: [PATCH 10/17] RemoteServer now notifies listeners on connect for non-ssl communication --- akka-core/src/main/scala/remote/RemoteServer.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 5f3c12d5a4..7c9f73258c 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -410,6 +410,8 @@ class RemoteServerHandler( } else future.getChannel.close } }) + } else { + server.foreachListener(_ ! RemoteServerClientConnected(server)) } } From 7b56315e35c3a93532cc2c62bb60282453b958b2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Aug 2010 12:29:02 +0200 Subject: [PATCH 11/17] Added more comments and made code more readable for the BoundedTransferQueue --- .../src/main/scala/dispatch/Queues.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala index e12f22a526..7f16f7aa57 100644 --- a/akka-core/src/main/scala/dispatch/Queues.scala +++ b/akka-core/src/main/scala/dispatch/Queues.scala @@ -19,7 +19,8 @@ class BoundedTransferQueue[E <: AnyRef]( require(pushTimeUnit ne null) protected val guard = new Semaphore(capacity) - + + //Enqueue an item within the push timeout (acquire Semaphore) protected def enq(f: => Boolean): Boolean = { if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { val result = try { @@ -35,10 +36,10 @@ class BoundedTransferQueue[E <: AnyRef]( false } - protected def deq(f: => E): E = { - val result: E = f - if (result ne null) guard.release - result + //Dequeue an item (release Semaphore) + protected def deq(e: E): E = { + if (e ne null) guard.release //Signal removal of item + e } override def take(): E = deq(super.take) @@ -46,9 +47,15 @@ class BoundedTransferQueue[E <: AnyRef]( override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit)) override def remainingCapacity = guard.availablePermits - override def remove(o: AnyRef): Boolean = deq({ - (if (o.isInstanceOf[E] && super.remove(o)) o else null).asInstanceOf[E] - }) ne null + + override def remove(o: AnyRef): Boolean = { + if (super.remove(o)) { + guard.release + true + } else { + false + } + } override def offer(e: E): Boolean = enq(super.offer(e)) From 8926c360540e7c0a36fee6edcda13b4826199072 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Thu, 26 Aug 2010 14:23:53 +0200 Subject: [PATCH 12/17] fixed resart of actor with thread based dispatcher --- akka-core/src/main/scala/actor/ActorRef.scala | 1 + .../actor/supervisor/SupervisorMiscSpec.scala | 79 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 87999b8580..0cf81083f4 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -1141,6 +1141,7 @@ class LocalActorRef private[akka]( freshActor.initTransactionalState actorInstance.set(freshActor) if (failedActor.isInstanceOf[TypedActor]) failedActor.asInstanceOf[TypedActor].swapInstanceInProxy(freshActor) + if (dispatcher.isShutdown) dispatcher.start Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) freshActor.postRestart(reason) } diff --git a/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala b/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala new file mode 100644 index 0000000000..3d048684cd --- /dev/null +++ b/akka-core/src/test/scala/actor/supervisor/SupervisorMiscSpec.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.actor + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import se.scalablesolutions.akka.dispatch.Dispatchers +import se.scalablesolutions.akka.config.ScalaConfig.{RestartStrategy, SupervisorConfig, LifeCycle, Permanent, OneForOne, Supervise} +import java.util.concurrent.CountDownLatch + +class SupervisorMiscSpec extends WordSpec with MustMatchers { + "A Supervisor" should { + + "restart a crashing actor and its dispatcher for any dispatcher" in { + val countDownLatch = new CountDownLatch(4) + + val actor1 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + override def postRestart(cause: Throwable) {countDownLatch.countDown} + + protected def receive = { + case "kill" => throw new Exception("killed") + case _ => println("received unknown message") + } + }).start + + val actor2 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher("test") + override def postRestart(cause: Throwable) {countDownLatch.countDown} + + protected def receive = { + case "kill" => throw new Exception("killed") + case _ => println("received unknown message") + } + }).start + + val actor3 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test") + override def postRestart(cause: Throwable) {countDownLatch.countDown} + + protected def receive = { + case "kill" => throw new Exception("killed") + case _ => println("received unknown message") + } + }).start + + val actor4 = Actor.actorOf(new Actor { + self.dispatcher = Dispatchers.newHawtDispatcher(true) + override def postRestart(cause: Throwable) {countDownLatch.countDown} + + protected def receive = { + case "kill" => throw new Exception("killed") + case _ => println("received unknown message") + } + }).start + + val sup = Supervisor( + SupervisorConfig( + RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])), + Supervise(actor1, LifeCycle(Permanent)) :: + Supervise(actor2, LifeCycle(Permanent)) :: + Supervise(actor3, LifeCycle(Permanent)) :: + Supervise(actor4, LifeCycle(Permanent)) :: + Nil)) + + actor1 ! "kill" + actor2 ! "kill" + actor3 ! "kill" + actor4 ! "kill" + + countDownLatch.await() + assert(!actor1.dispatcher.isShutdown, "dispatcher1 is shutdown") + assert(!actor2.dispatcher.isShutdown, "dispatcher2 is shutdown") + assert(!actor3.dispatcher.isShutdown, "dispatcher3 is shutdown") + assert(!actor4.dispatcher.isShutdown, "dispatcher4 is shutdown") + } + } +} From 3a4355c6ccfe08b44a02682c9e826adbe466625e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 26 Aug 2010 16:30:28 +0200 Subject: [PATCH 13/17] Changing source jar naming from src to sources --- project/build/AkkaProject.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index a06e5ec3d8..4503357db6 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -729,7 +729,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { def akkaArtifacts = descendents(info.projectPath / "dist", "*" + buildScalaVersion + "-" + version + ".jar") // ------------------------------------------------------------ - class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject + class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject { + override def packageDocsJar = this.defaultJarPath("-docs.jar") + override def packageSrcJar = this.defaultJarPath("-sources.jar") + } } trait DeployProject { self: BasicScalaProject => From 065ae058a4c62beef5815351e8c371914911a3d5 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 27 Aug 2010 12:12:33 +0200 Subject: [PATCH 14/17] Conserving memory usage per dispatcher --- .../dispatch/ExecutorBasedEventDrivenDispatcher.scala | 2 +- ...ecutorBasedEventDrivenWorkStealingDispatcher.scala | 2 +- .../src/main/scala/dispatch/MessageHandling.scala | 11 +++++------ .../main/scala/dispatch/ThreadBasedDispatcher.scala | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 6316627ff6..1f8a6bfe9c 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -157,7 +157,7 @@ class ExecutorBasedEventDrivenDispatcher( log.debug("Shutting down %s", toString) executor.shutdownNow active = false - references.clear + uuids.clear } def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 2b98975cb9..4e5d626aed 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -172,7 +172,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( log.debug("Shutting down %s", toString) executor.shutdownNow active = false - references.clear + uuids.clear } def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 92926bb253..20c58c9975 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -9,9 +9,8 @@ import java.util.List import se.scalablesolutions.akka.util.{HashCode, Logging} import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException} -import java.util.concurrent.ConcurrentHashMap - import org.multiverse.commitbarriers.CountDownCommitBarrier +import java.util.concurrent.{ConcurrentSkipListSet} /** * @author Jonas Bonér @@ -68,16 +67,16 @@ trait MessageQueue { * @author Jonas Bonér */ trait MessageDispatcher extends Logging { - protected val references = new ConcurrentHashMap[String, ActorRef] + protected val uuids = new ConcurrentSkipListSet[String] def dispatch(invocation: MessageInvocation) def start def shutdown - def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef) + def register(actorRef: ActorRef) = uuids add actorRef.uuid def unregister(actorRef: ActorRef) = { - references.remove(actorRef.uuid) + uuids remove actorRef.uuid if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } - def canBeShutDown: Boolean = references.isEmpty + def canBeShutDown: Boolean = uuids.isEmpty def isShutdown: Boolean def mailboxSize(actorRef: ActorRef):Int = 0 } diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 684a38a97c..d33d0fb337 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -47,7 +47,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In log.debug("Shutting down %s", toString) active = false selectorThread.interrupt - references.clear + uuids.clear } override def toString = "ThreadBasedDispatcher[" + threadName + "]" From aab4724024c586797491fb1f2ef993e11425f882 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 27 Aug 2010 15:58:51 +0200 Subject: [PATCH 15/17] Adding a guard to dispatcher_= in ActorRef --- akka-core/src/main/scala/actor/ActorRef.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 0cf81083f4..dbb72b5cbb 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -741,7 +741,7 @@ class LocalActorRef private[akka]( /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = { + def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { if (!isRunning || isBeingRestarted) _dispatcher = md else throw new ActorInitializationException( "Can not swap dispatcher for " + toString + " after it has been started") From a6bf0c068a197027457c753e3f540dbd2af3ee0a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 27 Aug 2010 16:20:21 +0200 Subject: [PATCH 16/17] Make sure dispatcher isnt changed on actor restart --- akka-core/src/main/scala/actor/ActorRef.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index dbb72b5cbb..1f025bb3d1 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -742,9 +742,11 @@ class LocalActorRef private[akka]( * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { - if (!isRunning || isBeingRestarted) _dispatcher = md - else throw new ActorInitializationException( + if (!isBeingRestarted) { + if (!isRunning) _dispatcher = md + else throw new ActorInitializationException( "Can not swap dispatcher for " + toString + " after it has been started") + } } /** @@ -1141,7 +1143,6 @@ class LocalActorRef private[akka]( freshActor.initTransactionalState actorInstance.set(freshActor) if (failedActor.isInstanceOf[TypedActor]) failedActor.asInstanceOf[TypedActor].swapInstanceInProxy(freshActor) - if (dispatcher.isShutdown) dispatcher.start Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) freshActor.postRestart(reason) } From 447eb0ae4752023e4cce7b83206640f90f7149ad Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 27 Aug 2010 21:27:37 +0200 Subject: [PATCH 17/17] remove logback.xml from akka-core jar and exclude logback-test.xml from distribution. --- akka-core/src/main/resources/logback.xml | 31 ------------------- akka-core/src/test/resources/logback-test.xml | 21 ------------- config/logback.xml | 2 +- project/build/AkkaProject.scala | 8 +++-- 4 files changed, 6 insertions(+), 56 deletions(-) delete mode 100644 akka-core/src/main/resources/logback.xml delete mode 100644 akka-core/src/test/resources/logback-test.xml diff --git a/akka-core/src/main/resources/logback.xml b/akka-core/src/main/resources/logback.xml deleted file mode 100644 index 4635396601..0000000000 --- a/akka-core/src/main/resources/logback.xml +++ /dev/null @@ -1,31 +0,0 @@ - - - - - - - - - - - - - - [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n - - - - ./logs/akka.log - - [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n - - - ./logs/akka.log.%d{yyyy-MM-dd-HH} - - - - - - - - diff --git a/akka-core/src/test/resources/logback-test.xml b/akka-core/src/test/resources/logback-test.xml deleted file mode 100644 index 78eae40ec4..0000000000 --- a/akka-core/src/test/resources/logback-test.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - - - - - - - - - - - - [%4p] [%d{ISO8601}] [%t] %c{1}: %m%n - - - - - - - diff --git a/config/logback.xml b/config/logback.xml index 1ace0bfd8f..3e6ba75548 100644 --- a/config/logback.xml +++ b/config/logback.xml @@ -22,7 +22,7 @@ ./logs/akka.log.%d{yyyy-MM-dd-HH} - + diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 4503357db6..0d0a9e00cb 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -250,6 +250,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { .filter(!_.getName.contains("scala-library")) .map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName) .mkString(" ") + + " config/" + " scala-library.jar" + " dist/akka-core_%s-%s.jar".format(buildScalaVersion, version) + " dist/akka-http_%s-%s.jar".format(buildScalaVersion, version) + @@ -271,10 +272,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { } override def mainResources = super.mainResources +++ - descendents(info.projectPath / "config", "*") --- - (super.mainResources ** "logback-test.xml") + (info.projectPath / "config").descendentsExcept("*", "logback-test.xml") - override def testResources = super.testResources --- (super.testResources ** "logback-test.xml") + override def runClasspath = super.runClasspath +++ "config" // ------------------------------------------------------------ // publishing @@ -730,6 +730,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------ class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject { + override def runClasspath = super.runClasspath +++ (AkkaParentProject.this.info.projectPath / "config") + override def testClasspath = super.testClasspath +++ (AkkaParentProject.this.info.projectPath / "config") override def packageDocsJar = this.defaultJarPath("-docs.jar") override def packageSrcJar = this.defaultJarPath("-sources.jar") }