2012-01-19 14:38:44 +00:00
/* *
2013-01-09 01:47:48 +01:00
* Copyright ( C ) 2009 - 2013 Typesafe Inc . < http : //www.typesafe.com>
2012-01-19 14:38:44 +00:00
*/
package akka.camel
2012-09-03 12:08:46 +02:00
import internal.ActorActivationException
2012-06-25 18:28:38 +02:00
import language.postfixOps
import language.existentials
2012-01-19 14:38:44 +00:00
import akka.actor._
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
2012-07-25 20:11:18 +02:00
import akka.camel.TestSupport._
2012-09-03 12:08:46 +02:00
import org.apache.camel.model. { ProcessorDefinition , RouteDefinition }
2012-01-19 14:38:44 +00:00
import org.apache.camel.builder.Builder
import org.apache.camel. { FailedToCreateRouteException , CamelExecutionException }
2012-03-01 17:32:10 +01:00
import java.util.concurrent. { ExecutionException , TimeUnit , TimeoutException }
2012-05-11 09:46:49 +02:00
import akka.actor.Status.Failure
2012-09-21 14:50:06 +02:00
import scala.concurrent.duration._
2013-01-31 02:16:39 +01:00
import scala.concurrent. { ExecutionContext , Await }
2012-08-14 16:29:38 +02:00
import akka.testkit._
2012-09-03 12:08:46 +02:00
import akka.util.Timeout
2012-01-19 14:38:44 +00:00
class ConsumerIntegrationTest extends WordSpec with MustMatchers with NonSharedCamelSystem {
2012-07-25 20:11:18 +02:00
"ConsumerIntegrationTest" must {
2012-09-03 12:08:46 +02:00
val defaultTimeoutDuration = 10 seconds
implicit val defaultTimeout = Timeout ( defaultTimeoutDuration )
2012-08-14 13:16:43 +02:00
implicit def ec : ExecutionContext = system . dispatcher
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer must throw FailedToCreateRouteException, while awaiting activation, if endpoint is invalid" in {
2013-03-07 12:10:30 +01:00
filterEvents ( EventFilter . warning ( pattern = "failed to activate.*" , occurrences = 1 ) ) {
2012-11-24 18:08:34 +01:00
val actorRef = system . actorOf ( Props ( new TestActor ( uri = "some invalid uri" ) ) , "invalidActor" )
2012-08-14 08:04:24 +02:00
intercept [ FailedToCreateRouteException ] {
2012-09-03 12:08:46 +02:00
Await . result ( camel . activationFutureFor ( actorRef ) , defaultTimeoutDuration )
2012-08-14 08:04:24 +02:00
}
}
2012-01-19 14:38:44 +00:00
}
2012-07-25 20:11:18 +02:00
"Consumer must support in-out messaging" in {
start ( new Consumer {
def endpointUri = "direct:a1"
def receive = {
case m : CamelMessage ⇒ sender ! "received " + m . bodyAs [ String ]
}
2012-09-03 12:08:46 +02:00
} , name = "direct-a1" )
2012-07-25 20:11:18 +02:00
camel . sendTo ( "direct:a1" , msg = "some message" ) must be ( "received some message" )
2012-01-19 14:38:44 +00:00
}
2012-07-25 20:11:18 +02:00
"Consumer must time-out if consumer is slow" in {
val SHORT_TIMEOUT = 10 millis
val LONG_WAIT = 200 millis
2012-01-19 14:38:44 +00:00
2012-09-03 12:08:46 +02:00
val ref = start ( new Consumer {
2012-07-25 20:11:18 +02:00
override def replyTimeout = SHORT_TIMEOUT
def endpointUri = "direct:a3"
def receive = { case _ ⇒ { Thread . sleep ( LONG_WAIT . toMillis ) ; sender ! "done" } }
2012-09-03 12:08:46 +02:00
} , name = "ignore-this-deadletter-timeout-consumer-reply" )
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
val exception = intercept [ CamelExecutionException ] {
camel . sendTo ( "direct:a3" , msg = "some msg 3" )
2012-01-19 14:38:44 +00:00
}
2012-07-25 20:11:18 +02:00
exception . getCause . getClass must be ( classOf [ TimeoutException ] )
2012-09-03 12:08:46 +02:00
stop ( ref )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer must process messages even after actor restart" in {
val restarted = TestLatch ( )
val consumer = start ( new Consumer {
def endpointUri = "direct:a2"
def receive = {
2012-08-14 14:07:52 +02:00
case "throw" ⇒ throw new TestException ( "" )
2012-07-25 20:11:18 +02:00
case m : CamelMessage ⇒ sender ! "received " + m . bodyAs [ String ]
}
override def postRestart ( reason : Throwable ) {
restarted . countDown ( )
}
2012-09-03 12:08:46 +02:00
} , "direct-a2" )
2012-08-14 14:07:52 +02:00
filterEvents ( EventFilter [ TestException ] ( occurrences = 1 ) ) {
2012-08-14 08:04:24 +02:00
consumer ! "throw"
2012-09-03 12:08:46 +02:00
Await . ready ( restarted , defaultTimeoutDuration )
2012-07-25 20:11:18 +02:00
2012-08-14 14:07:52 +02:00
camel . sendTo ( "direct:a2" , msg = "xyz" ) must be ( "received xyz" )
2012-08-14 08:04:24 +02:00
}
2012-09-03 12:08:46 +02:00
stop ( consumer )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer must unregister itself when stopped" in {
2012-09-03 12:08:46 +02:00
val consumer = start ( new TestActor ( ) , name = "test-actor-unregister" )
Await . result ( camel . activationFutureFor ( consumer ) , defaultTimeoutDuration )
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
camel . routeCount must be > ( 0 )
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
system . stop ( consumer )
2012-09-03 12:08:46 +02:00
Await . result ( camel . deactivationFutureFor ( consumer ) , defaultTimeoutDuration )
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
camel . routeCount must be ( 0 )
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer must register on uri passed in through constructor" in {
2012-09-03 12:08:46 +02:00
val consumer = start ( new TestActor ( "direct://test" ) , name = "direct-test" )
Await . result ( camel . activationFutureFor ( consumer ) , defaultTimeoutDuration )
2012-07-22 13:27:24 +02:00
2012-07-25 20:11:18 +02:00
camel . routeCount must be > ( 0 )
camel . routes . get ( 0 ) . getEndpoint . getEndpointUri must be ( "direct://test" )
system . stop ( consumer )
2012-09-03 12:08:46 +02:00
Await . result ( camel . deactivationFutureFor ( consumer ) , defaultTimeoutDuration )
2012-07-25 20:11:18 +02:00
camel . routeCount must be ( 0 )
2012-09-03 12:08:46 +02:00
stop ( consumer )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Error passing consumer supports error handling through route modification" in {
2012-09-03 12:08:46 +02:00
val ref = start ( new ErrorThrowingConsumer ( "direct:error-handler-test" ) {
override def onRouteDefinition = ( rd : RouteDefinition ) ⇒ {
2012-08-14 14:07:52 +02:00
rd . onException ( classOf [ TestException ] ) . handled ( true ) . transform ( Builder . exceptionMessage ) . end
2012-07-25 20:11:18 +02:00
}
2012-09-03 12:08:46 +02:00
} , name = "direct-error-handler-test" )
2012-08-14 14:07:52 +02:00
filterEvents ( EventFilter [ TestException ] ( occurrences = 1 ) ) {
2012-08-14 08:04:24 +02:00
camel . sendTo ( "direct:error-handler-test" , msg = "hello" ) must be ( "error: hello" )
}
2012-09-03 12:08:46 +02:00
stop ( ref )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Error passing consumer supports redelivery through route modification" in {
2012-09-03 12:08:46 +02:00
val ref = start ( new FailingOnceConsumer ( "direct:failing-once-concumer" ) {
override def onRouteDefinition = ( rd : RouteDefinition ) ⇒ {
2012-08-14 14:07:52 +02:00
rd . onException ( classOf [ TestException ] ) . maximumRedeliveries ( 1 ) . end
2012-07-25 20:11:18 +02:00
}
2012-09-03 12:08:46 +02:00
} , name = "direct-failing-once-consumer" )
2012-08-14 14:07:52 +02:00
filterEvents ( EventFilter [ TestException ] ( occurrences = 1 ) ) {
2012-08-14 08:04:24 +02:00
camel . sendTo ( "direct:failing-once-concumer" , msg = "hello" ) must be ( "accepted: hello" )
}
2012-09-03 12:08:46 +02:00
stop ( ref )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer supports manual Ack" in {
2012-09-03 12:08:46 +02:00
val ref = start ( new ManualAckConsumer ( ) {
2012-07-25 20:11:18 +02:00
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ sender ! Ack }
2012-09-03 12:08:46 +02:00
} , name = "direct-manual-ack-1" )
camel . template . asyncSendBody ( "direct:manual-ack" , "some message" ) . get ( defaultTimeoutDuration . toSeconds , TimeUnit . SECONDS ) must be ( null ) //should not timeout
stop ( ref )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer handles manual Ack failure" in {
val someException = new Exception ( "e1" )
2012-09-03 12:08:46 +02:00
val ref = start ( new ManualAckConsumer ( ) {
2012-07-25 20:11:18 +02:00
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ sender ! Failure ( someException ) }
2012-09-03 12:08:46 +02:00
} , name = "direct-manual-ack-2" )
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
intercept [ ExecutionException ] {
2012-09-03 12:08:46 +02:00
camel . template . asyncSendBody ( "direct:manual-ack" , "some message" ) . get ( defaultTimeoutDuration . toSeconds , TimeUnit . SECONDS )
2012-07-25 20:11:18 +02:00
} . getCause . getCause must be ( someException )
2012-09-03 12:08:46 +02:00
stop ( ref )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
"Consumer should time-out, if manual Ack not received within replyTimeout and should give a human readable error message" in {
2012-09-03 12:08:46 +02:00
val ref = start ( new ManualAckConsumer ( ) {
2012-07-25 20:11:18 +02:00
override def replyTimeout = 10 millis
def endpointUri = "direct:manual-ack"
def receive = { case _ ⇒ }
2012-09-03 12:08:46 +02:00
} , name = "direct-manual-ack-3" )
2012-01-19 14:38:44 +00:00
2012-07-25 20:11:18 +02:00
intercept [ ExecutionException ] {
2012-09-03 12:08:46 +02:00
camel . template . asyncSendBody ( "direct:manual-ack" , "some message" ) . get ( defaultTimeoutDuration . toSeconds , TimeUnit . SECONDS )
2012-07-25 20:11:18 +02:00
} . getCause . getCause . getMessage must include ( "Failed to get Ack" )
2012-09-03 12:08:46 +02:00
stop ( ref )
}
"respond to onRouteDefinition" in {
val ref = start ( new ErrorRespondingConsumer ( "direct:error-responding-consumer-1" ) , "error-responding-consumer" )
filterEvents ( EventFilter [ TestException ] ( occurrences = 1 ) ) {
val response = camel . sendTo ( "direct:error-responding-consumer-1" , "some body" )
response must be ( "some body has an error" )
}
stop ( ref )
2012-07-25 20:11:18 +02:00
}
2012-01-19 14:38:44 +00:00
}
}
class ErrorThrowingConsumer ( override val endpointUri : String ) extends Consumer {
def receive = {
2012-08-14 14:07:52 +02:00
case msg : CamelMessage ⇒ throw new TestException ( "error: %s" format msg . body )
2012-01-19 14:38:44 +00:00
}
2012-09-03 12:08:46 +02:00
override def preRestart ( reason : Throwable , message : Option [ Any ] ) {
super . preRestart ( reason , message )
sender ! Failure ( reason )
}
}
class ErrorRespondingConsumer ( override val endpointUri : String ) extends Consumer {
def receive = {
case msg : CamelMessage ⇒ throw new TestException ( "Error!" )
}
override def onRouteDefinition = ( rd : RouteDefinition ) ⇒ {
// Catch TestException and handle it by returning a modified version of the in message
rd . onException ( classOf [ TestException ] ) . handled ( true ) . transform ( Builder . body . append ( " has an error" ) ) . end
}
final override def preRestart ( reason : Throwable , message : Option [ Any ] ) {
super . preRestart ( reason , message )
sender ! Failure ( reason )
}
2012-01-19 14:38:44 +00:00
}
class FailingOnceConsumer ( override val endpointUri : String ) extends Consumer {
def receive = {
2012-03-01 17:32:10 +01:00
case msg : CamelMessage ⇒
2012-01-19 14:38:44 +00:00
if ( msg . headerAs [ Boolean ] ( "CamelRedelivered" ) . getOrElse ( false ) )
sender ! ( "accepted: %s" format msg . body )
else
2012-08-14 14:07:52 +02:00
throw new TestException ( "rejected: %s" format msg . body )
2012-01-19 14:38:44 +00:00
}
2012-09-03 12:08:46 +02:00
final override def preRestart ( reason : Throwable , message : Option [ Any ] ) {
super . preRestart ( reason , message )
sender ! Failure ( reason )
}
2012-01-19 14:38:44 +00:00
}
class TestActor ( uri : String = "file://target/abcde" ) extends Consumer {
def endpointUri = uri
2012-03-01 17:32:10 +01:00
def receive = { case _ ⇒ /* do nothing */ }
}
trait ManualAckConsumer extends Consumer {
2012-07-18 08:06:07 +02:00
override def autoAck = false
2012-01-19 14:38:44 +00:00
}
2012-08-14 14:07:52 +02:00
class TestException ( msg : String ) extends Exception ( msg )