2011-09-14 16:09:17 +02:00
/* *
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.remote.testconductor
2012-05-02 21:56:26 +02:00
import akka.actor. { Actor , ActorRef , ActorSystem , LoggingFSM , Props }
2011-09-14 16:09:17 +02:00
import RemoteConnection.getAddrString
import akka.util.duration._
import org.jboss.netty.channel. { Channel , SimpleChannelUpstreamHandler , ChannelHandlerContext , ChannelStateEvent , MessageEvent }
import com.eaio.uuid.UUID
2012-05-02 21:56:26 +02:00
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
2012-05-04 22:33:08 +02:00
import akka.pattern. { ask , pipe }
2012-05-02 21:56:26 +02:00
import akka.dispatch.Await
import scala.util.control.NoStackTrace
import akka.actor.Status
import akka.event.LoggingAdapter
import akka.actor.PoisonPill
import akka.event.Logging
2012-05-04 22:33:08 +02:00
import akka.dispatch.Future
2012-05-18 15:55:04 +02:00
import java.net.InetSocketAddress
import akka.actor.Address
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.WriteCompletionEvent
import java.net.ConnectException
import akka.util.Deadline
import akka.actor.Scheduler
2011-09-14 16:09:17 +02:00
2012-05-10 21:08:06 +02:00
/* *
* The Player is the client component of the
* [ [ akka . remote . testconductor . TestConductorExt ] ] extension . It registers with
* the [ [ akka . remote . testconductor . Conductor ] ] ’ s [ [ akka . remote . testconductor . Controller ] ]
* in order to participate in barriers and enable network failure injection .
*/
trait Player { this : TestConductorExt ⇒
2011-09-14 16:09:17 +02:00
2012-05-03 20:48:27 +02:00
private var _client : ActorRef = _
private def client = _client match {
case null ⇒ throw new IllegalStateException ( "TestConductor client not yet started" )
case x ⇒ x
}
2012-05-02 21:56:26 +02:00
2012-05-10 21:08:06 +02:00
/* *
* Connect to the conductor on the given port ( the host is taken from setting
* `akka.testconductor.host` ) . The connection is made asynchronously , but you
* should await completion of the returned Future because that implies that
* all expected participants of this test have successfully connected ( i . e .
* this is a first barrier in itself ) . The number of expected participants is
* set in [ [ akka . remote . testconductor . Conductor ] ] `.startController()` .
*/
2012-05-18 15:55:04 +02:00
def startClient ( name : String , controllerAddr : InetSocketAddress ) : Future [ Done ] = {
2012-05-03 20:48:27 +02:00
import ClientFSM._
import akka.actor.FSM._
import Settings.BarrierTimeout
if ( _client ne null ) throw new IllegalStateException ( "TestConductorClient already started" )
2012-05-18 15:55:04 +02:00
_client = system . actorOf ( Props ( new ClientFSM ( name , controllerAddr ) ) , "TestConductorClient" )
2012-05-03 20:48:27 +02:00
val a = system . actorOf ( Props ( new Actor {
var waiting : ActorRef = _
def receive = {
case fsm : ActorRef ⇒ waiting = sender ; fsm ! SubscribeTransitionCallBack ( self )
2012-05-07 08:04:15 +02:00
case Transition ( _ , Connecting , AwaitDone ) ⇒ // step 1, not there yet
2012-05-11 11:31:44 +02:00
case Transition ( _ , AwaitDone , Connected ) ⇒ waiting ! Done ; context stop self
case t : Transition [ _ ] ⇒ waiting ! Status . Failure ( new RuntimeException ( "unexpected transition: " + t ) ) ; context stop self
case CurrentState ( _ , Connected ) ⇒ waiting ! Done ; context stop self
2012-05-03 20:48:27 +02:00
case _ : CurrentState [ _ ] ⇒
}
} ) )
2012-05-02 21:56:26 +02:00
2012-05-04 22:33:08 +02:00
a ? client mapTo
2012-05-02 21:56:26 +02:00
}
2012-05-10 21:08:06 +02:00
/* *
* Enter the named barriers , one after the other , in the order given . Will
* throw an exception in case of timeouts or other errors .
*/
def enter ( name : String * ) {
2012-05-02 21:56:26 +02:00
system . log . debug ( "entering barriers " + name . mkString ( "(" , ", " , ")" ) )
2011-09-14 16:09:17 +02:00
name foreach { b ⇒
2012-05-02 21:56:26 +02:00
import Settings.BarrierTimeout
2012-05-18 15:55:04 +02:00
Await . result ( client ? ToServer ( EnterBarrier ( b ) ) , Duration . Inf )
2012-05-02 21:56:26 +02:00
system . log . debug ( "passed barrier {}" , b )
2011-09-14 16:09:17 +02:00
}
}
2012-05-18 15:55:04 +02:00
/* *
* Query remote transport address of named node .
*/
def getAddressFor ( name : String ) : Future [ Address ] = {
import Settings.BarrierTimeout
client ? ToServer ( GetAddress ( name ) ) mapTo
}
2011-09-14 16:09:17 +02:00
}
object ClientFSM {
sealed trait State
case object Connecting extends State
2012-05-07 08:04:15 +02:00
case object AwaitDone extends State
2011-09-14 16:09:17 +02:00
case object Connected extends State
2012-05-11 11:31:44 +02:00
case object Failed extends State
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
case class Data ( channel : Option [ Channel ] , runningOp : Option [ ( String , ActorRef ) ] )
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
case class Connected ( channel : Channel )
case class ConnectionFailure ( msg : String ) extends RuntimeException ( msg ) with NoStackTrace
2011-09-14 16:09:17 +02:00
case object Disconnected
}
2012-05-10 21:08:06 +02:00
/* *
* This is the controlling entity on the [ [ akka . remote . testconductor . Player ] ]
* side : in a first step it registers itself with a symbolic name and its remote
* address at the [ [ akka . remote . testconductor . Controller ] ] , then waits for the
* `Done` message which signals that all other expected test participants have
* done the same . After that , it will pass barrier requests to and from the
* coordinator and react to the [ [ akka . remote . testconductor . Conductor ] ] ’ s
* requests for failure injection .
*/
2012-05-18 15:55:04 +02:00
class ClientFSM ( name : String , controllerAddr : InetSocketAddress ) extends Actor with LoggingFSM [ ClientFSM . State , ClientFSM . Data ] {
2011-09-14 16:09:17 +02:00
import ClientFSM._
2012-05-03 20:48:27 +02:00
val settings = TestConductor ( ) . Settings
2012-05-02 21:56:26 +02:00
2012-05-18 15:55:04 +02:00
val handler = new PlayerHandler ( controllerAddr , settings . ClientReconnects , settings . ReconnectBackoff ,
self , Logging ( context . system , "PlayerHandler" ) , context . system . scheduler )
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
startWith ( Connecting , Data ( None , None ) )
2011-09-14 16:09:17 +02:00
2012-05-18 15:55:04 +02:00
when ( Connecting , stateTimeout = settings . ConnectTimeout ) {
2012-05-03 20:48:27 +02:00
case Event ( msg : ClientOp , _ ) ⇒
stay replying Status . Failure ( new IllegalStateException ( "not connected yet" ) )
2012-05-18 15:55:04 +02:00
case Event ( Connected ( channel ) , _ ) ⇒
channel . write ( Hello ( name , TestConductor ( ) . address ) )
goto ( AwaitDone ) using Data ( Some ( channel ) , None )
2011-09-14 16:09:17 +02:00
case Event ( _ : ConnectionFailure , _ ) ⇒
2012-05-11 11:31:44 +02:00
goto ( Failed )
2011-09-14 16:09:17 +02:00
case Event ( StateTimeout , _ ) ⇒
2012-05-02 21:56:26 +02:00
log . error ( "connect timeout to TestConductor" )
2012-05-11 11:31:44 +02:00
goto ( Failed )
2011-09-14 16:09:17 +02:00
}
2012-05-07 08:04:15 +02:00
when ( AwaitDone , stateTimeout = settings . BarrierTimeout . duration ) {
case Event ( Done , _ ) ⇒
log . debug ( "received Done: starting test" )
goto ( Connected )
2012-05-11 11:31:44 +02:00
case Event ( msg : NetworkOp , _ ) ⇒
log . error ( "received {} instead of Done" , msg )
goto ( Failed )
2012-05-18 15:55:04 +02:00
case Event ( msg : ServerOp , _ ) ⇒
2012-05-07 08:04:15 +02:00
stay replying Status . Failure ( new IllegalStateException ( "not connected yet" ) )
case Event ( StateTimeout , _ ) ⇒
log . error ( "connect timeout to TestConductor" )
2012-05-11 11:31:44 +02:00
goto ( Failed )
2012-05-07 08:04:15 +02:00
}
2011-09-14 16:09:17 +02:00
when ( Connected ) {
case Event ( Disconnected , _ ) ⇒
2012-05-02 21:56:26 +02:00
log . info ( "disconnected from TestConductor" )
2011-09-14 16:09:17 +02:00
throw new ConnectionFailure ( "disconnect" )
2012-05-18 15:55:04 +02:00
case Event ( ToServer ( Done ) , Data ( Some ( channel ) , _ ) ) ⇒
channel . write ( Done )
2012-05-04 22:33:08 +02:00
stay
2012-05-18 15:55:04 +02:00
case Event ( ToServer ( msg ) , d @ Data ( Some ( channel ) , None ) ) ⇒
channel . write ( msg )
val token = msg match {
case EnterBarrier ( barrier ) ⇒ barrier
case GetAddress ( node ) ⇒ node
2012-05-04 22:33:08 +02:00
}
2012-05-18 15:55:04 +02:00
stay using d . copy ( runningOp = Some ( token , sender ) )
case Event ( ToServer ( op ) , Data ( channel , Some ( ( token , _ ) ) ) ) ⇒
log . error ( "cannot write {} while waiting for {}" , op , token )
2012-05-04 22:33:08 +02:00
stay
2012-05-18 15:55:04 +02:00
case Event ( op : ClientOp , d @ Data ( Some ( channel ) , runningOp ) ) ⇒
op match {
case BarrierResult ( b , success ) ⇒
runningOp match {
case Some ( ( barrier , requester ) ) ⇒
if ( b != barrier ) {
requester ! Status . Failure ( new RuntimeException ( "wrong barrier " + b + " received while waiting for " + barrier ) )
} else if ( ! success ) {
requester ! Status . Failure ( new RuntimeException ( "barrier failed: " + b ) )
} else {
requester ! b
}
case None ⇒
log . warning ( "did not expect {}" , op )
}
stay using d . copy ( runningOp = None )
case AddressReply ( node , addr ) ⇒
runningOp match {
case Some ( ( _ , requester ) ) ⇒
requester ! addr
case None ⇒
log . warning ( "did not expect {}" , op )
}
stay using d . copy ( runningOp = None )
case ThrottleMsg ( target , dir , rate ) ⇒
import settings.QueryTimeout
import context.dispatcher
TestConductor ( ) . failureInjectors . get ( target . copy ( system = "" ) ) match {
case null ⇒ log . warning ( "cannot throttle unknown address {}" , target )
case inj ⇒
Future . sequence ( inj . refs ( dir ) map ( _ ? NetworkFailureInjector . SetRate ( rate ) ) ) map ( _ ⇒ ToServer ( Done ) ) pipeTo self
}
stay
case DisconnectMsg ( target , abort ) ⇒
import settings.QueryTimeout
TestConductor ( ) . failureInjectors . get ( target . copy ( system = "" ) ) match {
case null ⇒ log . warning ( "cannot disconnect unknown address {}" , target )
case inj ⇒ inj . sender ? NetworkFailureInjector . Disconnect ( abort ) map ( _ ⇒ ToServer ( Done ) ) pipeTo self
}
stay
case TerminateMsg ( exit ) ⇒
System . exit ( exit )
stay // needed because Java doesn’ t have Nothing
2012-05-04 22:33:08 +02:00
}
2011-09-14 16:09:17 +02:00
}
2012-05-11 11:31:44 +02:00
when ( Failed ) {
case Event ( msg : ClientOp , _ ) ⇒
stay replying Status . Failure ( new RuntimeException ( "cannot do " + msg + " while Failed" ) )
case Event ( msg : NetworkOp , _ ) ⇒
log . warning ( "ignoring network message {} while Failed" , msg )
stay
}
2011-09-14 16:09:17 +02:00
onTermination {
2012-05-18 15:55:04 +02:00
case StopEvent ( _ , _ , Data ( Some ( channel ) , _ ) ) ⇒
2011-09-14 16:09:17 +02:00
channel . close ( )
}
2012-05-03 20:48:27 +02:00
initialize
2011-09-14 16:09:17 +02:00
}
2012-05-10 21:08:06 +02:00
/* *
* This handler only forwards messages received from the conductor to the [ [ akka . remote . testconductor . ClientFSM ] ] .
*/
2012-05-18 15:55:04 +02:00
class PlayerHandler (
server : InetSocketAddress ,
private var reconnects : Int ,
backoff : Duration ,
fsm : ActorRef ,
log : LoggingAdapter ,
scheduler : Scheduler )
extends SimpleChannelUpstreamHandler {
2011-09-14 16:09:17 +02:00
import ClientFSM._
2012-05-18 15:55:04 +02:00
reconnect ( )
var nextAttempt : Deadline = _
override def channelOpen ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} open" , event . getChannel )
override def channelClosed ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} closed" , event . getChannel )
override def channelBound ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} bound" , event . getChannel )
override def channelUnbound ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = log . debug ( "channel {} unbound" , event . getChannel )
override def writeComplete ( ctx : ChannelHandlerContext , event : WriteCompletionEvent ) = log . debug ( "channel {} written {}" , event . getChannel , event . getWrittenAmount )
override def exceptionCaught ( ctx : ChannelHandlerContext , event : ExceptionEvent ) = {
log . debug ( "channel {} exception {}" , event . getChannel , event . getCause )
event . getCause match {
case c : ConnectException if reconnects > 0 ⇒
reconnects -= 1
scheduler . scheduleOnce ( nextAttempt . timeLeft ) ( reconnect ( ) )
case e ⇒ fsm ! ConnectionFailure ( e . getMessage )
}
}
private def reconnect ( ) : Unit = {
nextAttempt = Deadline . now + backoff
RemoteConnection ( Client , server , this )
}
2011-09-14 16:09:17 +02:00
override def channelConnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = {
2012-05-18 15:55:04 +02:00
val ch = event . getChannel
log . debug ( "connected to {}" , getAddrString ( ch ) )
fsm ! Connected ( ch )
2011-09-14 16:09:17 +02:00
}
override def channelDisconnected ( ctx : ChannelHandlerContext , event : ChannelStateEvent ) = {
val channel = event . getChannel
2012-05-02 21:56:26 +02:00
log . debug ( "disconnected from {}" , getAddrString ( channel ) )
fsm ! PoisonPill
2011-09-14 16:09:17 +02:00
}
override def messageReceived ( ctx : ChannelHandlerContext , event : MessageEvent ) = {
val channel = event . getChannel
2012-05-02 21:56:26 +02:00
log . debug ( "message from {}: {}" , getAddrString ( channel ) , event . getMessage )
2011-09-14 16:09:17 +02:00
event . getMessage match {
2012-05-04 22:33:08 +02:00
case msg : NetworkOp ⇒
2011-09-14 16:09:17 +02:00
fsm ! msg
case msg ⇒
2012-05-02 21:56:26 +02:00
log . info ( "server {} sent garbage '{}', disconnecting" , getAddrString ( channel ) , msg )
2011-09-14 16:09:17 +02:00
channel . close ( )
}
}
}