break out TestConductor stuff into akka-remote-tests project

This commit is contained in:
Roland 2012-05-10 10:24:05 +02:00
parent 9a33f468c0
commit d931a6e727
15 changed files with 59 additions and 33 deletions

View file

@ -0,0 +1,34 @@
#############################################
# Akka Remote Testing Reference Config File #
#############################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka {
testconductor {
# Timeout for joining a barrier: this is the maximum time any participants
# waits for everybody else to join a named barrier.
barrier-timeout = 30s
# Timeout for interrogation of TestConductors Controller actor
query-timeout = 5s
# Threshold for packet size in time unit above which the failure injector will
# split the packet and deliver in smaller portions; do not give value smaller
# than HashedWheelTimer resolution (would not make sense)
packet-split-threshold = 100ms
# Default port to start the conductor on; 0 means <auto>
port = 0
# Hostname of the TestConductor server, used by the server to bind to the IP
# and by the client to connect to it.
host = localhost
# Name of the TestConductor client (for identification on the server e.g. for
# failure injection)
name = "noname"
}
}

View file

@ -24,6 +24,7 @@ import java.net.InetSocketAddress
import akka.dispatch.Future
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy
import java.util.concurrent.ConcurrentHashMap
trait Conductor extends RunControl with FailureInject { this: TestConductorExt
@ -91,22 +92,21 @@ trait Conductor extends RunControl with FailureInject { this: TestConductorExt
class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAdapter) extends SimpleChannelUpstreamHandler {
@volatile
var clients = Map[Channel, ActorRef]()
val clients = new ConcurrentHashMap[Channel, ActorRef]()
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
log.debug("connection from {}", getAddrString(channel))
val fsm = system.actorOf(Props(new ServerFSM(controller, channel)))
clients += channel -> fsm
clients.put(channel, fsm)
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val channel = event.getChannel
log.debug("disconnect from {}", getAddrString(channel))
val fsm = clients(channel)
val fsm = clients.get(channel)
fsm ! PoisonPill
clients -= channel
clients.remove(channel)
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
@ -114,7 +114,7 @@ class ConductorHandler(system: ActorSystem, controller: ActorRef, log: LoggingAd
log.debug("message from {}: {}", getAddrString(channel), event.getMessage)
event.getMessage match {
case msg: NetworkOp
clients(channel) ! msg
clients.get(channel) ! msg
case msg
log.info("client {} sent garbage '{}', disconnecting", getAddrString(channel), msg)
channel.close()

View file

@ -155,30 +155,4 @@ akka {
type = PinnedDispatcher
}
}
testconductor {
# Timeout for joining a barrier: this is the maximum time any participants
# waits for everybody else to join a named barrier.
barrier-timeout = 30s
# Timeout for interrogation of TestConductors Controller actor
query-timeout = 5s
# Threshold for packet size in time unit above which the failure injector will
# split the packet and deliver in smaller portions; do not give value smaller
# than HashedWheelTimer resolution (would not make sense)
packet-split-threshold = 100ms
# Default port to start the conductor on; 0 means <auto>
port = 0
# Hostname of the TestConductor server, used by the server to bind to the IP
# and by the client to connect to it.
host = localhost
# Name of the TestConductor client (for identification on the server e.g. for
# failure injection)
name = "noname"
}
}

View file

@ -87,6 +87,24 @@ object AkkaBuild extends Build {
)
) configs (MultiJvm)
lazy val remoteTests = Project(
id = "akka-remote-tests",
base = file("akka-remote-tests"),
dependencies = Seq(remote % "compile;test->test;multi-jvm->multi-jvm", actorTests % "test->test", testkit % "test->test"),
settings = defaultSettings ++ multiJvmSettings ++ schoirSettings ++ Seq(
// disable parallel tests
parallelExecution in Test := false,
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
},
scalatestOptions in MultiJvm := Seq("-r", "org.scalatest.akka.QuietReporter"),
jvmOptions in MultiJvm := {
if (getBoolean("sbt.log.noformat")) Seq("-Dakka.test.nocolor=true") else Nil
},
test in Test <<= (test in Test) dependsOn (test in MultiJvm)
)
) configs (MultiJvm)
lazy val cluster = Project(
id = "akka-cluster",
base = file("akka-cluster"),
@ -438,7 +456,7 @@ object Dependencies {
Test.zookeeper, Test.log4j // needed for ZkBarrier in multi-jvm tests
)
val cluster = Seq(Test.junit, Test.scalatest)
val cluster = Seq(Test.junit, Test.scalatest)
val slf4j = Seq(slf4jApi, Test.logback)