Merge branch 'master' of git@github.com:jboner/akka
Conflicts: akka-actor/src/main/resources/logback.xml akka-actor/src/main/scala/dataflow/DataFlowVariable.scala akka-actor/src/test/scala/dataflow/DataFlowSpec.scala akka-remote/src/main/scala/remote/RemoteClient.scala akka-remote/src/test/resources/logback-test.xml
This commit is contained in:
commit
be32e80985
15 changed files with 311 additions and 237 deletions
|
|
@ -728,11 +728,13 @@ class LocalActorRef private[akka](
|
|||
/**
|
||||
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
|
||||
*/
|
||||
def dispatcher_=(md: MessageDispatcher): Unit = {
|
||||
if (!isRunning || isBeingRestarted) _dispatcher = md
|
||||
def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
|
||||
if (!isBeingRestarted) {
|
||||
if (!isRunning) _dispatcher = md
|
||||
else throw new ActorInitializationException(
|
||||
"Can not swap dispatcher for " + toString + " after it has been started")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the dispatcher for this actor.
|
||||
|
|
|
|||
|
|
@ -21,8 +21,10 @@ object DataFlow {
|
|||
object Start
|
||||
object Exit
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
class DataFlowVariableException(msg: String) extends AkkaException(msg)
|
||||
|
||||
/** Executes the supplied thunk in another thread
|
||||
*/
|
||||
def thread(body: => Unit): Unit = spawn(body)
|
||||
|
||||
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
|
||||
|
|
@ -36,21 +38,25 @@ object DataFlow {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed class DataFlowVariable[T <: Any] {
|
||||
val TIME_OUT = 1000 * 60 // 60 seconds default timeout
|
||||
|
||||
private object DataFlowVariable {
|
||||
private sealed abstract class DataFlowVariableMessage
|
||||
private case class Set[T <: Any](value: T) extends DataFlowVariableMessage
|
||||
private object Get extends DataFlowVariableMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed class DataFlowVariable[T <: Any](timeoutMs: Long) {
|
||||
import DataFlowVariable._
|
||||
|
||||
def this() = this(1000 * 60)
|
||||
|
||||
private val value = new AtomicReference[Option[T]](None)
|
||||
private val blockedReaders = new ConcurrentLinkedQueue[ActorRef]
|
||||
|
||||
private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
|
||||
self.timeout = TIME_OUT
|
||||
self.timeout = timeoutMs
|
||||
def receive = {
|
||||
case s@Set(v) =>
|
||||
if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) {
|
||||
|
|
@ -63,7 +69,7 @@ object DataFlow {
|
|||
}
|
||||
|
||||
private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor {
|
||||
self.timeout = TIME_OUT
|
||||
self.timeout = timeoutMs
|
||||
private var readerFuture: Option[CompletableFuture[Any]] = None
|
||||
def receive = {
|
||||
case Get => dataFlow.value.get match {
|
||||
|
|
@ -77,64 +83,42 @@ object DataFlow {
|
|||
|
||||
private[this] val in = actorOf(new In(this)).start
|
||||
|
||||
def <<(ref: DataFlowVariable[T]): Unit = if (this.value.get.isEmpty) in ! Set(ref())
|
||||
/** Sets the value of this variable (if unset) with the value of the supplied variable
|
||||
*/
|
||||
def <<(ref: DataFlowVariable[T]) {
|
||||
if (this.value.get.isEmpty) in ! Set(ref())
|
||||
else throw new DataFlowVariableException(
|
||||
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
|
||||
}
|
||||
|
||||
def <<(value: T): Unit = if (this.value.get.isEmpty) in ! Set(value)
|
||||
/** Sets the value of this variable (if unset)
|
||||
*/
|
||||
def <<(value: T) {
|
||||
if (this.value.get.isEmpty) in ! Set(value)
|
||||
else throw new DataFlowVariableException(
|
||||
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
|
||||
}
|
||||
|
||||
/** Retrieves the value of variable
|
||||
* throws a DataFlowVariableException if it times out
|
||||
*/
|
||||
def apply(): T = {
|
||||
value.get getOrElse {
|
||||
val out = actorOf(new Out(this)).start
|
||||
|
||||
val result = try {
|
||||
blockedReaders offer out
|
||||
val result = (out !! Get).as[T]
|
||||
(out !! Get).as[T]
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
out ! Exit
|
||||
result.getOrElse(throw new DataFlowVariableException(
|
||||
"Timed out (after " + TIME_OUT + " milliseconds) while waiting for result"))
|
||||
throw e
|
||||
}
|
||||
|
||||
result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result"))
|
||||
}
|
||||
}
|
||||
|
||||
def shutdown = in ! Exit
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
/*FIXME I do not work
|
||||
class DataFlowStream[T <: Any] extends Seq[T] {
|
||||
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
|
||||
|
||||
def <<<(ref: DataFlowVariable[T]): Boolean = queue offer ref
|
||||
|
||||
def <<<(value: T): Boolean = {
|
||||
val ref = new DataFlowVariable[T]
|
||||
ref << value
|
||||
queue offer ref
|
||||
}
|
||||
|
||||
def apply(): T = queue.take.apply
|
||||
|
||||
def take: DataFlowVariable[T] = queue.take
|
||||
|
||||
//==== For Seq ====
|
||||
|
||||
def length: Int = queue.size
|
||||
|
||||
def apply(i: Int): T = {
|
||||
if (i == 0) apply()
|
||||
else throw new UnsupportedOperationException(
|
||||
"Access by index other than '0' is not supported by DataFlowStream")
|
||||
}
|
||||
|
||||
def iterator: Iterator[T] = new Iterator[T] {
|
||||
private val i = queue.iterator
|
||||
def hasNext: Boolean = i.hasNext
|
||||
def next: T = { val ref = i.next; ref() }
|
||||
}
|
||||
|
||||
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
|
||||
}*/
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class DataFlowVariableException(msg: String) extends AkkaException(msg)
|
||||
}
|
||||
|
|
@ -157,7 +157,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
log.debug("Shutting down %s", toString)
|
||||
executor.shutdownNow
|
||||
active = false
|
||||
references.clear
|
||||
uuids.clear
|
||||
}
|
||||
|
||||
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
|
||||
|
|
|
|||
|
|
@ -172,7 +172,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
log.debug("Shutting down %s", toString)
|
||||
executor.shutdownNow
|
||||
active = false
|
||||
references.clear
|
||||
uuids.clear
|
||||
}
|
||||
|
||||
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
|
||||
|
|
|
|||
|
|
@ -9,9 +9,8 @@ import java.util.List
|
|||
import se.scalablesolutions.akka.util.{HashCode, Logging}
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
import java.util.concurrent.{ConcurrentSkipListSet}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -68,16 +67,16 @@ trait MessageQueue {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageDispatcher extends Logging {
|
||||
protected val references = new ConcurrentHashMap[String, ActorRef]
|
||||
protected val uuids = new ConcurrentSkipListSet[String]
|
||||
def dispatch(invocation: MessageInvocation)
|
||||
def start
|
||||
def shutdown
|
||||
def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef)
|
||||
def register(actorRef: ActorRef) = uuids add actorRef.uuid
|
||||
def unregister(actorRef: ActorRef) = {
|
||||
references.remove(actorRef.uuid)
|
||||
uuids remove actorRef.uuid
|
||||
if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
def canBeShutDown: Boolean = references.isEmpty
|
||||
def canBeShutDown: Boolean = uuids.isEmpty
|
||||
def isShutdown: Boolean
|
||||
def mailboxSize(actorRef: ActorRef):Int = 0
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxCapacity: In
|
|||
log.debug("Shutting down %s", toString)
|
||||
active = false
|
||||
selectorThread.interrupt
|
||||
references.clear
|
||||
uuids.clear
|
||||
}
|
||||
|
||||
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
|
||||
|
|
|
|||
|
|
@ -20,13 +20,12 @@ import java.net.{InetAddress, UnknownHostException}
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@serializable abstract class AkkaException(message: String) extends RuntimeException(message) {
|
||||
@volatile private var isLogged = false
|
||||
import AkkaException._
|
||||
val exceptionName = getClass.getName
|
||||
|
||||
val uuid = String.format("%s_%s", AkkaException.hostname, UUID.newUuid.toString)
|
||||
val uuid = "%s_%s".format(hostname, UUID.newUuid.toString)
|
||||
|
||||
override val toString =
|
||||
String.format("%s\n\t[%s]\n\t%s\n\t%s", exceptionName, uuid, message, stackTrace)
|
||||
override val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, stackTrace)
|
||||
|
||||
val stackTrace = {
|
||||
val sw = new StringWriter
|
||||
|
|
@ -35,10 +34,12 @@ import java.net.{InetAddress, UnknownHostException}
|
|||
sw.toString
|
||||
}
|
||||
|
||||
def log = if (!isLogged) {
|
||||
isLogged = true
|
||||
private lazy val _log = {
|
||||
AkkaException.log.error(toString)
|
||||
()
|
||||
}
|
||||
|
||||
def log: Unit = _log
|
||||
}
|
||||
|
||||
object AkkaException extends Logging {
|
||||
|
|
|
|||
|
|
@ -11,18 +11,16 @@ import org.scalatest.BeforeAndAfterAll
|
|||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import se.scalablesolutions.akka.dispatch.DefaultCompletableFuture
|
||||
import java.util.concurrent.{TimeUnit, CountDownLatch}
|
||||
import annotation.tailrec
|
||||
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
|
||||
import se.scalablesolutions.akka.actor.ActorRegistry
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
||||
describe("DataflowVariable") {
|
||||
/* it("should work and generate correct results") {
|
||||
it("should be able to set the value of one variable from other variables") {
|
||||
import DataFlow._
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
|
|
@ -36,13 +34,12 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
|||
thread { x << 40 }
|
||||
thread { y << 2 }
|
||||
|
||||
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
||||
List(x,y,z).foreach(_.shutdown)
|
||||
latch.await(10,TimeUnit.SECONDS) should equal (true)
|
||||
result.get should equal (42)
|
||||
ActorRegistry.shutdownAll
|
||||
List(x,y,z).foreach(_.shutdown)
|
||||
}
|
||||
|
||||
it("should be able to transform a stream") {
|
||||
it("should be able to sum a sequence of ints") {
|
||||
import DataFlow._
|
||||
|
||||
def ints(n: Int, max: Int): List[Int] =
|
||||
|
|
@ -68,106 +65,9 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
|
|||
latch.countDown
|
||||
}
|
||||
|
||||
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
||||
List(x,y,z).foreach(_.shutdown)
|
||||
latch.await(10,TimeUnit.SECONDS) should equal (true)
|
||||
result.get should equal (sum(0,ints(0,1000)))
|
||||
ActorRegistry.shutdownAll
|
||||
List(x,y,z).foreach(_.shutdown)
|
||||
}
|
||||
*/
|
||||
}
|
||||
/*it("should be able to join streams") {
|
||||
import DataFlow._
|
||||
|
||||
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
|
||||
stream <<< n
|
||||
ints(n + 1, max, stream)
|
||||
}
|
||||
|
||||
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
|
||||
out <<< s
|
||||
sum(in() + s, in, out)
|
||||
}
|
||||
|
||||
val producer = new DataFlowStream[Int]
|
||||
val consumer = new DataFlowStream[Int]
|
||||
val latch = new CountDownLatch(1)
|
||||
val result = new AtomicInteger(0)
|
||||
|
||||
thread { ints(0, 1000, producer) }
|
||||
thread {
|
||||
Thread.sleep(1000)
|
||||
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
|
||||
latch.countDown
|
||||
}
|
||||
|
||||
latch.await(3,TimeUnit.SECONDS) should equal (true)
|
||||
result.get should equal (332833500)
|
||||
ActorRegistry.shutdownAll
|
||||
}
|
||||
|
||||
it("should be able to sum streams recursively") {
|
||||
import DataFlow._
|
||||
|
||||
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
|
||||
stream <<< n
|
||||
ints(n + 1, max, stream)
|
||||
}
|
||||
|
||||
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
|
||||
out <<< s
|
||||
sum(in() + s, in, out)
|
||||
}
|
||||
|
||||
val result = new AtomicLong(0)
|
||||
|
||||
val producer = new DataFlowStream[Int]
|
||||
val consumer = new DataFlowStream[Int]
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
@tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = {
|
||||
val x = stream()
|
||||
|
||||
if(result.addAndGet(x) == 166666500)
|
||||
latch.countDown
|
||||
|
||||
recurseSum(stream)
|
||||
}
|
||||
|
||||
thread { ints(0, 1000, producer) }
|
||||
thread { sum(0, producer, consumer) }
|
||||
thread { recurseSum(consumer) }
|
||||
|
||||
latch.await(15,TimeUnit.SECONDS) should equal (true)
|
||||
ActorRegistry.shutdownAll
|
||||
}*/
|
||||
|
||||
/* Test not ready for prime time, causes some sort of deadlock */
|
||||
/* it("should be able to conditionally set variables") {
|
||||
|
||||
import DataFlow._
|
||||
|
||||
val latch = new CountDownLatch(1)
|
||||
val x, y, z, v = new DataFlowVariable[Int]
|
||||
|
||||
val main = thread {
|
||||
x << 1
|
||||
z << Math.max(x(),y())
|
||||
latch.countDown
|
||||
}
|
||||
|
||||
val setY = thread {
|
||||
Thread sleep 2000
|
||||
y << 2
|
||||
}
|
||||
|
||||
val setV = thread {
|
||||
v << y
|
||||
}
|
||||
|
||||
latch.await(2,TimeUnit.SECONDS) should equal (true)
|
||||
List(x,y,z,v) foreach (_.shutdown)
|
||||
List(main,setY,setV) foreach (_ ! Exit)
|
||||
println("Foo")
|
||||
ActorRegistry.shutdownAll
|
||||
}*/
|
||||
}
|
||||
92
akka-core/src/main/scala/dispatch/Queues.scala
Normal file
92
akka-core/src/main/scala/dispatch/Queues.scala
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
import java.util.concurrent.{TimeUnit, Semaphore}
|
||||
import java.util.Iterator
|
||||
import se.scalablesolutions.akka.util.Logger
|
||||
|
||||
class BoundedTransferQueue[E <: AnyRef](
|
||||
val capacity: Int,
|
||||
val pushTimeout: Long,
|
||||
val pushTimeUnit: TimeUnit)
|
||||
extends LinkedTransferQueue[E] {
|
||||
require(capacity > 0)
|
||||
require(pushTimeout > 0)
|
||||
require(pushTimeUnit ne null)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
|
||||
//Enqueue an item within the push timeout (acquire Semaphore)
|
||||
protected def enq(f: => Boolean): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
f
|
||||
} catch {
|
||||
case e =>
|
||||
guard.release //If something broke, release
|
||||
throw e
|
||||
}
|
||||
if (!result) guard.release //Didn't add anything
|
||||
result
|
||||
} else
|
||||
false
|
||||
}
|
||||
|
||||
//Dequeue an item (release Semaphore)
|
||||
protected def deq(e: E): E = {
|
||||
if (e ne null) guard.release //Signal removal of item
|
||||
e
|
||||
}
|
||||
|
||||
override def take(): E = deq(super.take)
|
||||
override def poll(): E = deq(super.poll)
|
||||
override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit))
|
||||
|
||||
override def remainingCapacity = guard.availablePermits
|
||||
|
||||
override def remove(o: AnyRef): Boolean = {
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E): Boolean =
|
||||
enq(super.offer(e))
|
||||
|
||||
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean =
|
||||
enq(super.offer(e,timeout,unit))
|
||||
|
||||
override def add(e: E): Boolean =
|
||||
enq(super.add(e))
|
||||
|
||||
override def put(e :E): Unit =
|
||||
enq({ super.put(e); true })
|
||||
|
||||
override def tryTransfer(e: E): Boolean =
|
||||
enq(super.tryTransfer(e))
|
||||
|
||||
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean =
|
||||
enq(super.tryTransfer(e,timeout,unit))
|
||||
|
||||
override def transfer(e: E): Unit =
|
||||
enq({ super.transfer(e); true })
|
||||
|
||||
override def iterator: Iterator[E] = {
|
||||
val it = super.iterator
|
||||
new Iterator[E] {
|
||||
def hasNext = it.hasNext
|
||||
def next = it.next
|
||||
def remove {
|
||||
it.remove
|
||||
guard.release //Assume remove worked if no exception was thrown
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import se.scalablesolutions.akka.dispatch.Dispatchers
|
||||
import se.scalablesolutions.akka.config.ScalaConfig.{RestartStrategy, SupervisorConfig, LifeCycle, Permanent, OneForOne, Supervise}
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
class SupervisorMiscSpec extends WordSpec with MustMatchers {
|
||||
"A Supervisor" should {
|
||||
|
||||
"restart a crashing actor and its dispatcher for any dispatcher" in {
|
||||
val countDownLatch = new CountDownLatch(4)
|
||||
|
||||
val actor1 = Actor.actorOf(new Actor {
|
||||
self.dispatcher = Dispatchers.newThreadBasedDispatcher(self)
|
||||
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
||||
|
||||
protected def receive = {
|
||||
case "kill" => throw new Exception("killed")
|
||||
case _ => println("received unknown message")
|
||||
}
|
||||
}).start
|
||||
|
||||
val actor2 = Actor.actorOf(new Actor {
|
||||
self.dispatcher = Dispatchers.newReactorBasedSingleThreadEventDrivenDispatcher("test")
|
||||
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
||||
|
||||
protected def receive = {
|
||||
case "kill" => throw new Exception("killed")
|
||||
case _ => println("received unknown message")
|
||||
}
|
||||
}).start
|
||||
|
||||
val actor3 = Actor.actorOf(new Actor {
|
||||
self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test")
|
||||
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
||||
|
||||
protected def receive = {
|
||||
case "kill" => throw new Exception("killed")
|
||||
case _ => println("received unknown message")
|
||||
}
|
||||
}).start
|
||||
|
||||
val actor4 = Actor.actorOf(new Actor {
|
||||
self.dispatcher = Dispatchers.newHawtDispatcher(true)
|
||||
override def postRestart(cause: Throwable) {countDownLatch.countDown}
|
||||
|
||||
protected def receive = {
|
||||
case "kill" => throw new Exception("killed")
|
||||
case _ => println("received unknown message")
|
||||
}
|
||||
}).start
|
||||
|
||||
val sup = Supervisor(
|
||||
SupervisorConfig(
|
||||
RestartStrategy(OneForOne, 3, 5000, List(classOf[Exception])),
|
||||
Supervise(actor1, LifeCycle(Permanent)) ::
|
||||
Supervise(actor2, LifeCycle(Permanent)) ::
|
||||
Supervise(actor3, LifeCycle(Permanent)) ::
|
||||
Supervise(actor4, LifeCycle(Permanent)) ::
|
||||
Nil))
|
||||
|
||||
actor1 ! "kill"
|
||||
actor2 ! "kill"
|
||||
actor3 ! "kill"
|
||||
actor4 ! "kill"
|
||||
|
||||
countDownLatch.await()
|
||||
assert(!actor1.dispatcher.isShutdown, "dispatcher1 is shutdown")
|
||||
assert(!actor2.dispatcher.isShutdown, "dispatcher2 is shutdown")
|
||||
assert(!actor3.dispatcher.isShutdown, "dispatcher3 is shutdown")
|
||||
assert(!actor4.dispatcher.isShutdown, "dispatcher4 is shutdown")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -185,42 +185,45 @@ class RemoteClient private[akka] (
|
|||
extends Logging with ListenerManagement {
|
||||
val name = "RemoteClient@" + hostname + "::" + port
|
||||
|
||||
@volatile private[remote] var isRunning = false
|
||||
//FIXME Should these be clear:ed on shutdown?
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFuture[_]]
|
||||
private val supervisors = new ConcurrentHashMap[String, ActorRef]
|
||||
|
||||
private val channelFactory = new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,
|
||||
Executors.newCachedThreadPool)
|
||||
|
||||
private val bootstrap = new ClientBootstrap(channelFactory)
|
||||
private val timer = new HashedWheelTimer
|
||||
private val remoteAddress = new InetSocketAddress(hostname, port)
|
||||
|
||||
private[remote] var connection: ChannelFuture = _
|
||||
private[remote] val openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName);
|
||||
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
||||
@volatile private[remote] var isRunning = false
|
||||
@volatile private var bootstrap: ClientBootstrap = _
|
||||
@volatile private[remote] var connection: ChannelFuture = _
|
||||
@volatile private[remote] var openChannels: DefaultChannelGroup = _
|
||||
@volatile private var timer: HashedWheelTimer = _
|
||||
|
||||
private val reconnectionTimeWindow =
|
||||
Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
||||
private val reconnectionTimeWindow = Duration(config.getInt(
|
||||
"akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
|
||||
@volatile private var reconnectionTimeWindowStart = 0L
|
||||
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(
|
||||
name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
def connect = synchronized {
|
||||
if (!isRunning) {
|
||||
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
|
||||
timer = new HashedWheelTimer
|
||||
bootstrap = new ClientBootstrap(
|
||||
new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,Executors.newCachedThreadPool
|
||||
)
|
||||
)
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
connection = bootstrap.connect(remoteAddress)
|
||||
log.info("Starting remote client connection to [%s:%s]", hostname, port)
|
||||
// Wait until the connection attempt succeeds or fails.
|
||||
val channel = connection.awaitUninterruptibly.getChannel
|
||||
openChannels.add(channel)
|
||||
if (!connection.isSuccess) {
|
||||
foreachListener(l => l ! RemoteClientError(connection.getCause, this))
|
||||
foreachListener(_ ! RemoteClientError(connection.getCause, this))
|
||||
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
|
||||
}
|
||||
foreachListener(l => l ! RemoteClientStarted(this))
|
||||
foreachListener(_ ! RemoteClientStarted(this))
|
||||
isRunning = true
|
||||
}
|
||||
}
|
||||
|
|
@ -229,10 +232,14 @@ class RemoteClient private[akka] (
|
|||
log.info("Shutting down %s", name)
|
||||
if (isRunning) {
|
||||
isRunning = false
|
||||
foreachListener(l => l ! RemoteClientShutdown(this))
|
||||
foreachListener(_ ! RemoteClientShutdown(this))
|
||||
timer.stop
|
||||
timer = null
|
||||
openChannels.close.awaitUninterruptibly
|
||||
openChannels = null
|
||||
bootstrap.releaseExternalResources
|
||||
bootstrap = null
|
||||
connection = null
|
||||
log.info("%s has been shut down", name)
|
||||
}
|
||||
}
|
||||
|
|
@ -396,12 +403,12 @@ class RemoteClientHandler(
|
|||
futures.remove(reply.getId)
|
||||
} else {
|
||||
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
|
||||
client.foreachListener(l => l ! RemoteClientError(exception, client))
|
||||
client.foreachListener(_ ! RemoteClientError(exception, client))
|
||||
throw exception
|
||||
}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
client.foreachListener(l => l ! RemoteClientError(e, client))
|
||||
client.foreachListener(_ ! RemoteClientError(e, client))
|
||||
log.error("Unexpected exception in remote client handler: %s", e)
|
||||
throw e
|
||||
}
|
||||
|
|
@ -416,8 +423,7 @@ class RemoteClientHandler(
|
|||
client.connection = bootstrap.connect(remoteAddress)
|
||||
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
|
||||
if (!client.connection.isSuccess) {
|
||||
client.foreachListener(l =>
|
||||
l.asInstanceOf[ActorRef] ! RemoteClientError(client.connection.getCause, client))
|
||||
client.foreachListener(_ ! RemoteClientError(client.connection.getCause, client))
|
||||
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
|
||||
}
|
||||
}
|
||||
|
|
@ -427,7 +433,7 @@ class RemoteClientHandler(
|
|||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
def connect = {
|
||||
client.foreachListener(l => l ! RemoteClientConnected(client))
|
||||
client.foreachListener(_ ! RemoteClientConnected(client))
|
||||
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
|
||||
client.resetReconnectionTimeWindow
|
||||
}
|
||||
|
|
@ -444,12 +450,12 @@ class RemoteClientHandler(
|
|||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.foreachListener(l => l ! RemoteClientDisconnected(client))
|
||||
client.foreachListener(_ ! RemoteClientDisconnected(client))
|
||||
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
client.foreachListener(l => l ! RemoteClientError(event.getCause, client))
|
||||
client.foreachListener(_ ! RemoteClientError(event.getCause, client))
|
||||
log.error(event.getCause, "Unexpected exception from downstream in remote client")
|
||||
event.getChannel.close
|
||||
}
|
||||
|
|
|
|||
|
|
@ -411,6 +411,8 @@ class RemoteServerHandler(
|
|||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else {
|
||||
server.foreachListener(_ ! RemoteServerClientConnected(server))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2,11 +2,12 @@ import sbt._
|
|||
|
||||
object AkkaRepositories {
|
||||
val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
|
||||
val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
|
||||
val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
|
||||
val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/")
|
||||
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
|
||||
val CodehausSnapshotRepo = MavenRepository("Codehaus Snapshots", "http://snapshots.repository.codehaus.org")
|
||||
val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
|
||||
val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
|
||||
}
|
||||
|
||||
trait AkkaBaseProject extends BasicScalaProject {
|
||||
|
|
@ -17,27 +18,30 @@ trait AkkaBaseProject extends BasicScalaProject {
|
|||
|
||||
// for development version resolve to .ivy2/local
|
||||
// val akkaModuleConfig = ModuleConfiguration("se.scalablesolutions.akka", AkkaRepo)
|
||||
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
|
||||
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
|
||||
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
|
||||
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", AkkaRepo)
|
||||
|
||||
val aspectwerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", AkkaRepo)
|
||||
val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo)
|
||||
val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo)
|
||||
val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo)
|
||||
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
|
||||
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
|
||||
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
|
||||
val sjsonModuleConfig = ModuleConfiguration("sjson.json", AkkaRepo)
|
||||
val voldemortModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
|
||||
val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo)
|
||||
val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo)
|
||||
|
||||
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
|
||||
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
|
||||
val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
|
||||
val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
|
||||
val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
|
||||
val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo)
|
||||
val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo)
|
||||
val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo)
|
||||
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
|
||||
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
|
||||
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausSnapshotRepo) // only while snapshot version
|
||||
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
|
||||
val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
|
||||
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
|
||||
val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
}
|
||||
|
||||
trait AkkaProject extends AkkaBaseProject {
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@
|
|||
<fileNamePattern>./logs/akka.log.%d{yyyy-MM-dd-HH}</fileNamePattern>
|
||||
</rollingPolicy>
|
||||
</appender>
|
||||
<logger name="se.scalablesolutions" level="DEBUG"/>
|
||||
<logger name="se.scalablesolutions" level="INFO"/>
|
||||
<root level="INFO">
|
||||
<appender-ref ref="stdout"/>
|
||||
<appender-ref ref="R"/>
|
||||
|
|
|
|||
|
|
@ -252,6 +252,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
.filter(!_.getName.contains("scala-library"))
|
||||
.map("lib_managed/scala_%s/compile/".format(buildScalaVersion) + _.getName)
|
||||
.mkString(" ") +
|
||||
" config/" +
|
||||
" scala-library.jar" +
|
||||
" dist/akka-actor_%s-%s.jar".format(buildScalaVersion, version) +
|
||||
" dist/akka-typed-actor_%s-%s.jar".format(buildScalaVersion, version) +
|
||||
|
|
@ -275,10 +276,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
}
|
||||
|
||||
override def mainResources = super.mainResources +++
|
||||
descendents(info.projectPath / "config", "*") ---
|
||||
(super.mainResources ** "logback-test.xml")
|
||||
(info.projectPath / "config").descendentsExcept("*", "logback-test.xml")
|
||||
|
||||
override def testResources = super.testResources --- (super.testResources ** "logback-test.xml")
|
||||
override def runClasspath = super.runClasspath +++ "config"
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// publishing
|
||||
|
|
@ -755,7 +755,12 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
def akkaArtifacts = descendents(info.projectPath / "dist", "*" + buildScalaVersion + "-" + version + ".jar")
|
||||
|
||||
// ------------------------------------------------------------
|
||||
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject
|
||||
class AkkaDefaultProject(info: ProjectInfo, val deployPath: Path) extends DefaultProject(info) with DeployProject with OSGiProject {
|
||||
override def runClasspath = super.runClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
|
||||
override def testClasspath = super.testClasspath +++ (AkkaParentProject.this.info.projectPath / "config")
|
||||
override def packageDocsJar = this.defaultJarPath("-docs.jar")
|
||||
override def packageSrcJar = this.defaultJarPath("-sources.jar")
|
||||
}
|
||||
}
|
||||
|
||||
trait DeployProject { self: BasicScalaProject =>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue