2011-09-14 16:09:17 +02:00
/* *
2013-02-20 21:26:52 +01:00
* Copyright ( C ) 2009 - 2013 Typesafe Inc . < http : //www.typesafe.com>
2011-09-14 16:09:17 +02:00
*/
package akka.remote.testconductor
2012-06-25 17:09:00 +02:00
import language.postfixOps
2012-10-30 15:08:41 +01:00
import java.util.concurrent.TimeoutException
2013-08-23 14:39:21 +02:00
import akka.actor._
2012-10-30 15:08:41 +01:00
import akka.remote.testconductor.RemoteConnection.getAddrString
import scala.collection.immutable
import scala.concurrent. { ExecutionContext , Await , Future }
2012-09-21 14:50:06 +02:00
import scala.concurrent.duration._
2012-10-30 15:08:41 +01:00
import scala.util.control.NoStackTrace
import scala.reflect.classTag
2012-05-02 21:56:26 +02:00
import akka.util.Timeout
2012-06-29 13:33:20 +02:00
import org.jboss.netty.channel. { Channel , SimpleChannelUpstreamHandler , ChannelHandlerContext , ChannelStateEvent , MessageEvent , WriteCompletionEvent , ExceptionEvent }
2012-06-13 13:52:58 +02:00
import akka.pattern. { ask , pipe , AskTimeoutException }
2012-06-29 13:33:20 +02:00
import akka.event. { LoggingAdapter , Logging }
import java.net. { InetSocketAddress , ConnectException }
2012-12-12 12:20:54 +01:00
import akka.remote.transport.ThrottlerTransportAdapter. { SetThrottle , TokenBucket , Blackhole , Unthrottled }
2013-04-26 12:18:01 +02:00
import akka.dispatch. { UnboundedMessageQueueSemantics , RequiresMessageQueue }
2011-09-14 16:09:17 +02:00
2012-05-10 21:08:06 +02:00
/* *
* The Player is the client component of the
* [ [ akka . remote . testconductor . TestConductorExt ] ] extension . It registers with
* the [ [ akka . remote . testconductor . Conductor ] ] ’ s [ [ akka . remote . testconductor . Controller ] ]
* in order to participate in barriers and enable network failure injection .
*/
trait Player { this : TestConductorExt ⇒
2011-09-14 16:09:17 +02:00
2012-05-03 20:48:27 +02:00
private var _client : ActorRef = _
private def client = _client match {
case null ⇒ throw new IllegalStateException ( "TestConductor client not yet started" )
case x ⇒ x
}
2012-05-02 21:56:26 +02:00
2012-05-10 21:08:06 +02:00
/* *
* Connect to the conductor on the given port ( the host is taken from setting
* `akka.testconductor.host` ) . The connection is made asynchronously , but you
* should await completion of the returned Future because that implies that
* all expected participants of this test have successfully connected ( i . e .
* this is a first barrier in itself ) . The number of expected participants is
* set in [ [ akka . remote . testconductor . Conductor ] ] `.startController()` .
*/
2012-05-18 18:44:53 +02:00
def startClient ( name : RoleName , controllerAddr : InetSocketAddress ) : Future [ Done ] = {
2012-05-03 20:48:27 +02:00
import ClientFSM._
import akka.actor.FSM._
import Settings.BarrierTimeout
if ( _client ne null ) throw new IllegalStateException ( "TestConductorClient already started" )
2013-04-26 20:47:18 +02:00
_client = system . actorOf ( Props ( classOf [ ClientFSM ] , name , controllerAddr ) , "TestConductorClient" )
2013-04-26 12:18:01 +02:00
val a = system . actorOf ( Props ( new Actor with RequiresMessageQueue [ UnboundedMessageQueueSemantics ] {
2012-05-03 20:48:27 +02:00
var waiting : ActorRef = _
def receive = {
2013-01-04 13:07:56 +01:00
case fsm : ActorRef ⇒
waiting = sender ; fsm ! SubscribeTransitionCallBack ( self )
2012-05-07 08:04:15 +02:00
case Transition ( _ , Connecting , AwaitDone ) ⇒ // step 1, not there yet
2013-01-04 13:07:56 +01:00
case Transition ( _ , AwaitDone , Connected ) ⇒
waiting ! Done ; context stop self
case t : Transition [ _ ] ⇒
waiting ! Status . Failure ( new RuntimeException ( "unexpected transition: " + t ) ) ; context stop self
case CurrentState ( _ , Connected ) ⇒
waiting ! Done ; context stop self
case _ : CurrentState [ _ ] ⇒
2012-05-03 20:48:27 +02:00
}
} ) )
2012-05-02 21:56:26 +02:00
2012-07-25 18:02:45 +02:00
a ? client mapTo classTag [ Done ]
2012-05-02 21:56:26 +02:00
}
2012-05-10 21:08:06 +02:00
/* *
* Enter the named barriers , one after the other , in the order given . Will
* throw an exception in case of timeouts or other errors .
*/
2012-10-30 15:08:41 +01:00
def enter ( name : String * ) : Unit = enter ( Settings . BarrierTimeout , name . to [ immutable . Seq ] )
2012-06-13 13:52:58 +02:00
/* *
* Enter the named barriers , one after the other , in the order given . Will
* throw an exception in case of timeouts or other errors .
*/
2012-10-30 15:08:41 +01:00
def enter ( timeout : Timeout , name : immutable.Seq [ String ] ) {
2012-05-02 21:56:26 +02:00
system . log . debug ( "entering barriers " + name . mkString ( "(" , ", " , ")" ) )
2012-06-19 16:32:51 +02:00
val stop = Deadline . now + timeout . duration
2011-09-14 16:09:17 +02:00
name foreach { b ⇒
2012-09-21 14:50:06 +02:00
val barrierTimeout = stop . timeLeft
2012-06-13 13:52:58 +02:00
if ( barrierTimeout < Duration . Zero ) {
client ! ToServer ( FailBarrier ( b ) )
2012-06-15 14:39:47 +02:00
throw new TimeoutException ( "Server timed out while waiting for barrier " + b ) ;
2012-06-13 13:52:58 +02:00
}
try {
2012-09-21 14:50:06 +02:00
implicit val timeout = Timeout ( barrierTimeout + Settings . QueryTimeout . duration )
2012-06-13 13:52:58 +02:00
Await . result ( client ? ToServer ( EnterBarrier ( b , Option ( barrierTimeout ) ) ) , Duration . Inf )
} catch {
case e : AskTimeoutException ⇒
client ! ToServer ( FailBarrier ( b ) )
2012-06-15 14:39:47 +02:00
// Why don't TimeoutException have a constructor that takes a cause?
throw new TimeoutException ( "Client timed out while waiting for barrier " + b ) ;
2012-06-13 13:52:58 +02:00
}
2012-05-02 21:56:26 +02:00
system . log . debug ( "passed barrier {}" , b )
2011-09-14 16:09:17 +02:00
}
}
2012-05-18 15:55:04 +02:00
/* *
* Query remote transport address of named node .
*/
2012-05-18 18:44:53 +02:00
def getAddressFor ( name : RoleName ) : Future [ Address ] = {
2012-06-13 13:52:58 +02:00
import Settings.QueryTimeout
2012-07-25 18:02:45 +02:00
client ? ToServer ( GetAddress ( name ) ) mapTo classTag [ Address ]
2012-05-18 15:55:04 +02:00
}
2011-09-14 16:09:17 +02:00
}
2012-05-22 15:19:45 +02:00
/* *
* INTERNAL API .
*/
private [ akka ] object ClientFSM {
2011-09-14 16:09:17 +02:00
sealed trait State
case object Connecting extends State
2012-05-07 08:04:15 +02:00
case object AwaitDone extends State
2011-09-14 16:09:17 +02:00
case object Connected extends State
2012-05-11 11:31:44 +02:00
case object Failed extends State
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
case class Data ( channel : Option [ Channel ] , runningOp : Option [ ( String , ActorRef ) ] )
2011-09-14 16:09:17 +02:00
2013-08-23 14:39:21 +02:00
case class Connected ( channel : Channel ) extends NoSerializationVerificationNeeded
2012-05-18 15:55:04 +02:00
case class ConnectionFailure ( msg : String ) extends RuntimeException ( msg ) with NoStackTrace
2011-09-14 16:09:17 +02:00
case object Disconnected
}
2012-05-10 21:08:06 +02:00
/* *
* This is the controlling entity on the [ [ akka . remote . testconductor . Player ] ]
* side : in a first step it registers itself with a symbolic name and its remote
* address at the [ [ akka . remote . testconductor . Controller ] ] , then waits for the
* `Done` message which signals that all other expected test participants have
* done the same . After that , it will pass barrier requests to and from the
* coordinator and react to the [ [ akka . remote . testconductor . Conductor ] ] ’ s
* requests for failure injection .
2012-05-22 15:19:45 +02:00
*
2012-09-03 14:08:06 +02:00
* Note that you can 't perform requests concurrently , e . g . enter barrier
* from one thread and ask for node address from another thread .
*
2012-05-22 15:19:45 +02:00
* INTERNAL API .
2012-05-10 21:08:06 +02:00
*/
2013-04-26 12:18:01 +02:00
private [ akka ] class ClientFSM ( name : RoleName , controllerAddr : InetSocketAddress ) extends Actor
with LoggingFSM [ ClientFSM . State , ClientFSM . Data ] with RequiresMessageQueue [ UnboundedMessageQueueSemantics ] {
2011-09-14 16:09:17 +02:00
import ClientFSM._
2012-05-03 20:48:27 +02:00
val settings = TestConductor ( ) . Settings
2012-05-02 21:56:26 +02:00
2012-05-18 15:55:04 +02:00
val handler = new PlayerHandler ( controllerAddr , settings . ClientReconnects , settings . ReconnectBackoff ,
2012-10-29 14:37:39 +01:00
settings . ClientSocketWorkerPoolSize , self , Logging ( context . system , "PlayerHandler" ) ,
context . system . scheduler ) ( context . dispatcher )
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
startWith ( Connecting , Data ( None , None ) )
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
when ( Connecting , stateTimeout = settings . ConnectTimeout ) {
2012-05-03 20:48:27 +02:00
case Event ( msg : ClientOp , _ ) ⇒
stay replying Status . Failure ( new IllegalStateException ( "not connected yet" ) )
2012-05-18 15:55:04 +02:00
case Event ( Connected ( channel ) , _ ) ⇒
2012-05-18 18:44:53 +02:00
channel . write ( Hello ( name . name , TestConductor ( ) . address ) )
2012-05-18 15:55:04 +02:00
goto ( AwaitDone ) using Data ( Some ( channel ) , None )
2011-09-14 16:09:17 +02:00
case Event ( _ : ConnectionFailure , _ ) ⇒
2012-05-11 11:31:44 +02:00
goto ( Failed )
2011-09-14 16:09:17 +02:00
case Event ( StateTimeout , _ ) ⇒
2012-05-02 21:56:26 +02:00
log . error ( "connect timeout to TestConductor" )
2012-05-11 11:31:44 +02:00
goto ( Failed )
2011-09-14 16:09:17 +02:00
}
2012-05-07 08:04:15 +02:00
when ( AwaitDone , stateTimeout = settings . BarrierTimeout . duration ) {
case Event ( Done , _ ) ⇒
log . debug ( "received Done: starting test" )
goto ( Connected )
2012-05-11 11:31:44 +02:00
case Event ( msg : NetworkOp , _ ) ⇒
log . error ( "received {} instead of Done" , msg )
goto ( Failed )
2012-05-18 15:55:04 +02:00
case Event ( msg : ServerOp , _ ) ⇒
2012-05-07 08:04:15 +02:00
stay replying Status . Failure ( new IllegalStateException ( "not connected yet" ) )
case Event ( StateTimeout , _ ) ⇒
log . error ( "connect timeout to TestConductor" )
2012-05-11 11:31:44 +02:00
goto ( Failed )
2012-05-07 08:04:15 +02:00
}
2011-09-14 16:09:17 +02:00
when ( Connected ) {
case Event ( Disconnected , _ ) ⇒
2012-05-02 21:56:26 +02:00
log . info ( "disconnected from TestConductor" )
2011-09-14 16:09:17 +02:00
throw new ConnectionFailure ( "disconnect" )
2012-06-25 17:09:00 +02:00
case Event ( ToServer ( _ : Done ) , Data ( Some ( channel ) , _ ) ) ⇒
2012-05-18 15:55:04 +02:00
channel . write ( Done )
2012-05-04 22:33:08 +02:00
stay
2012-05-18 15:55:04 +02:00
case Event ( ToServer ( msg ) , d @ Data ( Some ( channel ) , None ) ) ⇒
channel . write ( msg )
val token = msg match {
2012-06-13 13:52:58 +02:00
case EnterBarrier ( barrier , timeout ) ⇒ barrier
case GetAddress ( node ) ⇒ node . name
2012-05-04 22:33:08 +02:00
}
2013-03-28 23:45:48 +01:00
stay using d . copy ( runningOp = Some ( token -> sender ) )
2012-05-18 15:55:04 +02:00
case Event ( ToServer ( op ) , Data ( channel , Some ( ( token , _ ) ) ) ) ⇒
log . error ( "cannot write {} while waiting for {}" , op , token )
2012-05-04 22:33:08 +02:00
stay
2012-05-18 15:55:04 +02:00
case Event ( op : ClientOp , d @ Data ( Some ( channel ) , runningOp ) ) ⇒
op match {
case BarrierResult ( b , success ) ⇒
runningOp match {
case Some ( ( barrier , requester ) ) ⇒
2012-06-25 17:15:55 +02:00
val response =
if ( b != barrier ) Status . Failure ( new RuntimeException ( "wrong barrier " + b + " received while waiting for " + barrier ) )
else if ( ! success ) Status . Failure ( new RuntimeException ( "barrier failed: " + b ) )
else b
requester ! response
2012-05-18 15:55:04 +02:00
case None ⇒
log . warning ( "did not expect {}" , op )
}
stay using d . copy ( runningOp = None )
case AddressReply ( node , addr ) ⇒
runningOp match {
2012-06-25 17:15:55 +02:00
case Some ( ( _ , requester ) ) ⇒ requester ! addr
2012-06-25 17:21:06 +02:00
case None ⇒ log . warning ( "did not expect {}" , op )
2012-05-18 15:55:04 +02:00
}
stay using d . copy ( runningOp = None )
2012-05-24 10:56:32 +02:00
case t : ThrottleMsg ⇒
2012-05-18 15:55:04 +02:00
import settings.QueryTimeout
2012-07-21 18:30:14 +02:00
import context.dispatcher // FIXME is this the right EC for the future below?
2012-12-12 12:20:54 +01:00
val mode = if ( t . rateMBit < 0.0f ) Unthrottled
2012-12-12 12:29:36 +01:00
else if ( t . rateMBit == 0.0f ) Blackhole
2012-12-12 14:49:38 +01:00
// Conversion needed as the TokenBucket measures in octets: 125000 Octets/s = 1Mbit/s
2012-12-19 15:47:30 +01:00
// FIXME: Initial capacity should be carefully chosen
2013-01-04 13:07:56 +01:00
else TokenBucket ( capacity = 1000 , tokensPerSecond = t . rateMBit * 125000.0 , nanoTimeOfLastSend = 0 , availableTokens = 0 )
2012-12-12 12:20:54 +01:00
val cmdFuture = TestConductor ( ) . transport . managementCommand ( SetThrottle ( t . target , t . direction , mode ) )
2012-12-12 14:49:38 +01:00
2012-12-12 12:20:54 +01:00
cmdFuture onSuccess {
2013-03-20 10:18:24 +01:00
case true ⇒ self ! ToServer ( Done )
2012-12-12 12:29:36 +01:00
case _ ⇒ throw new RuntimeException ( "Throttle was requested from the TestConductor, but no transport " +
2012-12-12 12:20:54 +01:00
"adapters available that support throttling. Specify `testTransport(on = true)` in your MultiNodeConfig" )
}
2012-05-18 15:55:04 +02:00
stay
2012-05-24 10:56:32 +02:00
case d : DisconnectMsg ⇒
2012-05-18 15:55:04 +02:00
import settings.QueryTimeout
2012-07-21 18:30:14 +02:00
import context.dispatcher // FIXME is this the right EC for the future below?
2012-12-12 12:20:54 +01:00
// FIXME: Currently ignoring, needs support from Remoting
2012-05-18 15:55:04 +02:00
stay
2013-04-23 16:44:14 +02:00
case TerminateMsg ( None ) ⇒
context . system . shutdown ( )
stay
case TerminateMsg ( Some ( exitValue ) ) ⇒
System . exit ( exitValue )
2012-05-18 15:55:04 +02:00
stay // needed because Java doesn’ t have Nothing
2012-06-25 17:09:00 +02:00
case _ : Done ⇒ stay //FIXME what should happen?
2012-05-04 22:33:08 +02:00
}
2011-09-14 16:09:17 +02:00
}
2012-05-11 11:31:44 +02:00
when ( Failed ) {
case Event ( msg : ClientOp , _ ) ⇒
stay replying Status . Failure ( new RuntimeException ( "cannot do " + msg + " while Failed" ) )
case Event ( msg : NetworkOp , _ ) ⇒
log . warning ( "ignoring network message {} while Failed" , msg )
stay
}
2011-09-14 16:09:17 +02:00
onTermination {
2012-05-18 15:55:04 +02:00
case StopEvent ( _ , _ , Data ( Some ( channel ) , _ ) ) ⇒
2011-09-14 16:09:17 +02:00
channel . close ( )
}
2013-03-30 02:32:40 +01:00
initialize ( )
2011-09-14 16:09:17 +02:00
}
2012-05-10 21:08:06 +02:00
/* *
* This handler only forwards messages received from the conductor to the [ [ akka . remote . testconductor . ClientFSM ] ] .
2012-05-22 15:19:45 +02:00
*
* INTERNAL API .
2012-05-10 21:08:06 +02:00
*/
2012-05-22 15:19:45 +02:00
private [ akka ] class PlayerHandler (
2012-05-18 15:55:04 +02:00
server : InetSocketAddress ,
private var reconnects : Int ,
2012-09-21 14:50:06 +02:00
backoff : FiniteDuration ,
2012-10-29 14:37:39 +01:00
poolSize : Int ,
2012-05-18 15:55:04 +02:00
fsm : ActorRef ,
log : LoggingAdapter ,
2012-08-08 15:57:30 +02:00
scheduler : Scheduler ) ( implicit executor : ExecutionContext )
2012-05-18 15:55:04 +02:00
extends SimpleChannelUpstreamHandler {
2011-09-14 16:09:17 +02:00
import ClientFSM._
2012-05-18 15:55:04 +02:00
reconnect ( )
var nextAttempt : Deadline = _
override def channelOpen ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} open" , event . getChannel )
override def channelClosed ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} closed" , event . getChannel )
override def channelBound ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} bound" , event . getChannel )
override def channelUnbound ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} unbound" , event . getChannel )
override def writeComplete ( ctx : ChannelHandlerContext , event : WriteCompletionEvent ) = log . debug ( "channel {} written {}" , event . getChannel , event . getWrittenAmount )
override def exceptionCaught ( ctx : ChannelHandlerContext , event : ExceptionEvent ) = {
log . debug ( "channel {} exception {}" , event . getChannel , event . getCause )
event . getCause match {
case c : ConnectException if reconnects > 0 ⇒
reconnects -= 1
2012-09-21 14:50:06 +02:00
scheduler . scheduleOnce ( nextAttempt . timeLeft ) ( reconnect ( ) )
2012-05-18 15:55:04 +02:00
case e ⇒ fsm ! ConnectionFailure ( e . getMessage )
}
}
private def reconnect ( ) : Unit = {
nextAttempt = Deadline . now + backoff
2012-10-29 14:37:39 +01:00
RemoteConnection ( Client , server , poolSize , this )
2012-05-18 15:55:04 +02:00
}
2011-09-14 16:09:17 +02:00
override def channelConnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = {
2012-05-18 15:55:04 +02:00
val ch = event . getChannel
log . debug ( "connected to {}" , getAddrString ( ch ) )
fsm ! Connected ( ch )
2011-09-14 16:09:17 +02:00
}
override def channelDisconnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = {
val channel = event . getChannel
2012-05-02 21:56:26 +02:00
log . debug ( "disconnected from {}" , getAddrString ( channel ) )
fsm ! PoisonPill
2013-05-20 15:46:18 +02:00
executor . execute ( new Runnable { def run = RemoteConnection . shutdown ( channel ) } ) // Must be shutdown outside of the Netty IO pool
2011-09-14 16:09:17 +02:00
}
override def messageReceived ( ctx : ChannelHandlerContext , event : MessageEvent ) = {
val channel = event . getChannel
2012-05-02 21:56:26 +02:00
log . debug ( "message from {}: {}" , getAddrString ( channel ) , event . getMessage )
2011-09-14 16:09:17 +02:00
event . getMessage match {
2012-05-04 22:33:08 +02:00
case msg : NetworkOp ⇒
2011-09-14 16:09:17 +02:00
fsm ! msg
case msg ⇒
2012-05-02 21:56:26 +02:00
log . info ( "server {} sent garbage '{}', disconnecting" , getAddrString ( channel ) , msg )
2011-09-14 16:09:17 +02:00
channel . close ( )
}
}
}