Merge branch 'master' into wip-failure-detector-puppet-jboner

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-06-11 16:53:55 +02:00
commit c0e2362daa
17 changed files with 141 additions and 87 deletions

View file

@ -6,7 +6,7 @@ package akka.actor
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.{ Await, MessageQueueAppendFailedException, BoundedDequeBasedMailbox }
import akka.dispatch.{ Await, BoundedDequeBasedMailbox }
import akka.pattern.ask
import akka.util.duration._
import akka.actor.ActorSystem.Settings
@ -17,16 +17,8 @@ object ActorWithBoundedStashSpec {
class StashingActor(implicit sys: ActorSystem) extends Actor with Stash {
def receive = {
case "hello"
stash()
sender ! "OK"
case "world"
try {
unstashAll()
} catch {
case e: MessageQueueAppendFailedException
expectedException.open()
}
case "hello" stash()
case "world" unstashAll()
}
}
@ -36,18 +28,10 @@ object ActorWithBoundedStashSpec {
def receive = {
case "hello"
numStashed += 1
try {
stash()
} catch {
case e: StashOverflowException
if (numStashed == 21) stashOverflow.open()
}
try stash() catch { case e: StashOverflowException if (numStashed == 21) sender ! "STASHOVERFLOW" }
}
}
@volatile var expectedException: TestLatch = null
@volatile var stashOverflow: TestLatch = null
val testConf: Config = ConfigFactory.parseString("""
my-dispatcher {
mailbox-type = "akka.actor.ActorWithBoundedStashSpec$Bounded"
@ -56,47 +40,42 @@ object ActorWithBoundedStashSpec {
""")
// bounded deque-based mailbox with capacity 10
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 5 seconds)
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 1 seconds)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach {
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach with ImplicitSender {
import ActorWithBoundedStashSpec._
implicit val sys = system
override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
}
override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) }
def myProps(creator: Actor): Props = Props(creator).withDispatcher("my-dispatcher")
"An Actor with Stash and BoundedDequeBasedMailbox" must {
"throw a MessageQueueAppendFailedException in case of a capacity violation" in {
ActorWithBoundedStashSpec.expectedException = new TestLatch
"end up in DeadLetters in case of a capacity violation" in {
system.eventStream.subscribe(testActor, classOf[DeadLetter])
val stasher = system.actorOf(myProps(new StashingActor))
// fill up stash
val futures = for (_ 1 to 11) yield { stasher ? "hello" }
futures foreach { Await.ready(_, 10 seconds) }
(1 to 11) foreach { _ stasher ! "hello" }
// cause unstashAll with capacity violation
stasher ! "world"
Await.ready(ActorWithBoundedStashSpec.expectedException, 10 seconds)
expectMsg(DeadLetter("hello", testActor, stasher))
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
}
}
"An Actor with bounded Stash" must {
"throw a StashOverflowException in case of a stash capacity violation" in {
ActorWithBoundedStashSpec.stashOverflow = new TestLatch
val stasher = system.actorOf(myProps(new StashingActorWithOverflow))
// fill up stash
for (_ 1 to 21) { stasher ! "hello" }
Await.ready(ActorWithBoundedStashSpec.stashOverflow, 10 seconds)
(1 to 21) foreach { _ stasher ! "hello" }
expectMsg("STASHOVERFLOW")
}
}
}

View file

@ -364,5 +364,39 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
system.stop(supervisor)
}
"must not lose system messages when a NonFatal exception occurs when processing a system message" in {
val parent = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy()({
case e: IllegalStateException if e.getMessage == "OHNOES" throw e
case _ SupervisorStrategy.Restart
})
val child = context.watch(context.actorOf(Props(new Actor {
override def postRestart(reason: Throwable): Unit = testActor ! "child restarted"
def receive = {
case "die" throw new IllegalStateException("OHNOES")
case "test" sender ! "child green"
}
}), "child"))
override def postRestart(reason: Throwable): Unit = testActor ! "parent restarted"
def receive = {
case t @ Terminated(`child`) testActor ! "child terminated"
case "die" child ! "die"
case "test" sender ! "green"
case "testchild" child forward "test"
}
}))
parent ! "die"
parent ! "testchild"
expectMsg("parent restarted")
expectMsg("child terminated")
parent ! "test"
expectMsg("green")
parent ! "testchild"
expectMsg("child green")
}
}
}

View file

@ -6,9 +6,8 @@ import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
import com.typesafe.config.Config
import akka.actor.ActorSystem
import akka.actor._
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -39,9 +38,10 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
q.numberOfMessages must be === config.capacity
q.hasMessages must be === true
intercept[MessageQueueAppendFailedException] {
q.enqueue(null, exampleMessage)
}
system.eventStream.subscribe(testActor, classOf[DeadLetter])
q.enqueue(testActor, exampleMessage)
expectMsg(DeadLetter(exampleMessage.message, system.deadLetters, testActor))
system.eventStream.unsubscribe(testActor, classOf[DeadLetter])
q.dequeue must be === exampleMessage
q.numberOfMessages must be(config.capacity - 1)

View file

@ -6,18 +6,11 @@ package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import akka.util._
import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
import akka.actor.ActorContext
import com.typesafe.config.Config
import akka.actor.ActorSystem
/**
* This exception normally is thrown when a bounded mailbox is over capacity
*/
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
import akka.actor._
/**
* INTERNAL API
@ -401,13 +394,11 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingQueue[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope) {
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) {
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
} else queue put handle
}
def dequeue(): Envelope = queue.poll()
}
@ -439,18 +430,16 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
override def queue: BlockingDeque[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue put handle
if (pushTimeOut.length > 0) {
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
} else queue put handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue putFirst handle
if (pushTimeOut.length > 0) {
if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
} else queue putFirst handle
def dequeue(): Envelope = queue.poll()
}

View file

@ -17,7 +17,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
val c1 = role("c1")
val c2 = role("c2")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy

View file

@ -36,7 +36,8 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
"A node that is LEAVING a non-singleton cluster" must {
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in {
// FIXME make it work and remove ignore
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest ignore {
awaitClusterUp(first, second, third)

View file

@ -42,7 +42,8 @@ abstract class NodeLeavingAndExitingSpec
"A node that is LEAVING a non-singleton cluster" must {
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
// FIXME make it work and remove ignore
"be moved to EXITING by the leader" taggedAs LongRunningTest ignore {
awaitClusterUp(first, second, third)

View file

@ -36,7 +36,8 @@ abstract class NodeLeavingSpec
"A node that is LEAVING a non-singleton cluster" must {
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
// FIXME make it work and remove ignore
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest ignore {
awaitClusterUp(first, second, third)

View file

@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object NodeShutdownMultiJvmSpec extends MultiNodeConfig {
object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
@ -24,17 +24,17 @@ object NodeShutdownMultiJvmSpec extends MultiNodeConfig {
}
class NodeShutdownWithFailureDetectorPuppetMultiJvmNode1 extends NodeShutdownSpec with FailureDetectorPuppetStrategy
class NodeShutdownWithFailureDetectorPuppetMultiJvmNode2 extends NodeShutdownSpec with FailureDetectorPuppetStrategy
class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy
class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy
class NodeShutdownWithAccrualFailureDetectorMultiJvmNode1 extends NodeShutdownSpec with AccrualFailureDetectorStrategy
class NodeShutdownWithAccrualFailureDetectorMultiJvmNode2 extends NodeShutdownSpec with AccrualFailureDetectorStrategy
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy
abstract class NodeShutdownSpec
extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
abstract class SingletonClusterSpec
extends MultiNodeSpec(SingletonClusterMultiJvmSpec)
with MultiNodeClusterSpec {
import NodeShutdownMultiJvmSpec._
import SingletonClusterMultiJvmSpec._
"A cluster of 2 nodes" must {

View file

@ -24,7 +24,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
gossip-interval = 400 ms
nr-of-deputy-nodes = 0
}
akka.loglevel = INFO
akka.loglevel = DEBUG
"""))
}

View file

@ -90,6 +90,42 @@ strong {color: #1d3c52; }
box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.25);
}
.warning {
background-image: none;
background-color: #fdf5d9;
filter: progid:DXImageTransform.Microsoft.gradient(enabled = false);
padding: 14px;
border-color: #ffffc4;
-webkit-box-shadow: none;
-moz-box-shadow: none;
box-shadow: none;
margin-bottom: 18px;
position: relative;
padding: 7px 15px;
color: #404040;
background-repeat: repeat-x;
background-image: -khtml-gradient(linear, left top, left bottom, from(#ffffc4), to(#ffff00));
background-image: -moz-linear-gradient(top, #ffffc4, #ffff00);
background-image: -ms-linear-gradient(top, #ffffc4, #ffff00);
background-image: -webkit-gradient(linear, left top, left bottom, color-stop(0%, #ffffc4), color-stop(100%, #ffff00));
background-image: -webkit-linear-gradient(top, #ffffc4, #ffff00);
background-image: -o-linear-gradient(top, #ffffc4, #ffff00);
background-image: linear-gradient(top, #ffffc4, #ffff00);
filter: progid:DXImageTransform.Microsoft.gradient(startColorstr='#ffffc4', endColorstr='#ffff00', GradientType=0);
text-shadow: 0 -1px 0 rgba(0, 0, 0, 0.25);
border-color: #dff69a #ffff00 #E4C652;
border-color: rgba(0, 0, 0, 0.1) rgba(0, 0, 0, 0.1) rgba(0, 0, 0, 0.25);
text-shadow: 0 1px 0 rgba(255, 255, 255, 0.5);
border-width: 1px;
border-style: solid;
-webkit-border-radius: 4px;
-moz-border-radius: 4px;
border-radius: 4px;
-webkit-box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.25);
-moz-box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.25);
box-shadow: inset 0 1px 0 rgba(255, 255, 255, 0.25);
}
.admonition p.admonition-title {
color: rgba(0, 0, 0, 0.6);
text-shadow: 0 1px 0 rgba(255, 255, 255, .7);

View file

@ -3,7 +3,7 @@
*/
package akka.remote.testconductor
import org.jboss.netty.channel.{ Channel, ChannelPipeline, ChannelPipelineFactory, ChannelUpstreamHandler, SimpleChannelUpstreamHandler, StaticChannelPipeline }
import org.jboss.netty.channel.{ Channel, ChannelPipeline, ChannelPipelineFactory, ChannelUpstreamHandler, SimpleChannelUpstreamHandler, DefaultChannelPipeline }
import org.jboss.netty.channel.socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory }
import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
@ -12,6 +12,7 @@ import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import akka.event.Logging
/**
* INTERNAL API.
@ -21,7 +22,9 @@ private[akka] class TestConductorPipelineFactory(handler: ChannelUpstreamHandler
val encap = List(new LengthFieldPrepender(4), new LengthFieldBasedFrameDecoder(10000, 0, 4, 0, 4))
val proto = List(new ProtobufEncoder, new ProtobufDecoder(TestConductorProtocol.Wrapper.getDefaultInstance))
val msg = List(new MsgEncoder, new MsgDecoder)
new StaticChannelPipeline(encap ::: proto ::: msg ::: handler :: Nil: _*)
(encap ::: proto ::: msg ::: handler :: Nil).foldLeft(new DefaultChannelPipeline) {
(pipe, handler) pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe
}
}
}

View file

@ -90,6 +90,7 @@ class LogRoleReplace {
private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started""".r
private val RemoteServerStarted = """\[([\w\-]+)\].*RemoteServerStarted@akka://.*@([\w\-\.]+):([0-9]+)""".r
private val ColorCode = """\[[0-9]+m"""
private var replacements: Map[String, String] = Map.empty
private var jvmToAddress: Map[String, String] = Map.empty
@ -106,12 +107,16 @@ class LogRoleReplace {
}
def processLine(line: String): String = {
if (updateReplacements(line))
replaceLine(line)
val cleanLine = removeColorCodes(line)
if (updateReplacements(cleanLine))
replaceLine(cleanLine)
else
line
cleanLine
}
private def removeColorCodes(line: String): String =
line.replaceAll(ColorCode, "")
private def updateReplacements(line: String): Boolean = {
if (line.startsWith("[info] * ")) {
// reset when new test begins

View file

@ -8,7 +8,7 @@ import java.net.{ InetAddress, InetSocketAddress }
import org.jboss.netty.util.{ Timeout, TimerTask, HashedWheelTimer }
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.group.DefaultChannelGroup
import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, StaticChannelPipeline, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel }
import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, DefaultChannelPipeline, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel }
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
import org.jboss.netty.handler.execution.ExecutionHandler
import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler }

View file

@ -12,7 +12,7 @@ import java.util.concurrent.Executors
import scala.collection.mutable.HashMap
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture }
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, StaticChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal }
import org.jboss.netty.channel.{ ChannelHandlerContext, Channel, DefaultChannelPipeline, ChannelHandler, ChannelPipelineFactory, ChannelLocal }
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
@ -50,10 +50,13 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
*/
object PipelineFactory {
/**
* Construct a StaticChannelPipeline from a sequence of handlers; to be used
* Construct a DefaultChannelPipeline from a sequence of handlers; to be used
* in implementations of ChannelPipelineFactory.
*/
def apply(handlers: Seq[ChannelHandler]): StaticChannelPipeline = new StaticChannelPipeline(handlers: _*)
def apply(handlers: Seq[ChannelHandler]): DefaultChannelPipeline =
handlers.foldLeft(new DefaultChannelPipeline) {
(pipe, handler) pipe.addLast(Logging.simpleName(handler.getClass), handler); pipe
}
/**
* Constructs the NettyRemoteTransport default pipeline with the give head handler, which

View file

@ -485,7 +485,7 @@ object Dependency {
object V {
val Camel = "2.8.0"
val Logback = "1.0.4"
val Netty = "3.3.0.Final"
val Netty = "3.5.0.Final"
val Protobuf = "2.4.1"
val ScalaStm = "0.5"
val Scalatest = "1.6.1"

View file

@ -93,6 +93,8 @@ fi
declare -r version=$1
declare -r publish_path="${release_server}:${release_path}"
[[ `java -version 2>&1 | grep "java version" | awk '{print $3}' | tr -d \" | awk '{split($0, array, ".")} END{print array[2]}'` -eq 6 ]] || fail "Java version is not 1.6"
# check for a git command
type -P git &> /dev/null || fail "git command not found"