2015-05-11 00:09:59 +02:00
/* *
* Copyright ( C ) 2009 - 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.impl.fusing
2015-10-31 14:46:10 +01:00
import akka.stream.stage._
import akka.stream.testkit.AkkaSpec
2015-05-11 00:09:59 +02:00
import akka.stream.testkit.Utils.TE
import scala.concurrent.duration._
2015-10-31 14:46:10 +01:00
class LifecycleInterpreterSpec extends GraphInterpreterSpecKit {
2015-05-11 00:09:59 +02:00
import akka.stream.Supervision._
"Interpreter" must {
2015-10-31 14:46:10 +01:00
"call preStart in order on stages" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PreStartAndPostStopIdentity ( onStart = _ ⇒ testActor ! "start-a" ) ,
PreStartAndPostStopIdentity ( onStart = _ ⇒ testActor ! "start-b" ) ,
PreStartAndPostStopIdentity ( onStart = _ ⇒ testActor ! "start-c" ) ) ) {
expectMsg ( "start-a" )
expectMsg ( "start-b" )
expectMsg ( "start-c" )
expectNoMsg ( 300. millis )
upstream . onComplete ( )
}
2015-10-31 14:46:10 +01:00
"call postStop in order on stages - when upstream completes" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PreStartAndPostStopIdentity ( onUpstreamCompleted = ( ) ⇒ testActor ! "complete-a" , onStop = ( ) ⇒ testActor ! "stop-a" ) ,
PreStartAndPostStopIdentity ( onUpstreamCompleted = ( ) ⇒ testActor ! "complete-b" , onStop = ( ) ⇒ testActor ! "stop-b" ) ,
PreStartAndPostStopIdentity ( onUpstreamCompleted = ( ) ⇒ testActor ! "complete-c" , onStop = ( ) ⇒ testActor ! "stop-c" ) ) ) {
upstream . onComplete ( )
expectMsg ( "complete-a" )
expectMsg ( "stop-a" )
expectMsg ( "complete-b" )
expectMsg ( "stop-b" )
expectMsg ( "complete-c" )
expectMsg ( "stop-c" )
expectNoMsg ( 300. millis )
}
2015-10-31 14:46:10 +01:00
"call postStop in order on stages - when upstream onErrors" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PreStartAndPostStopIdentity (
onUpstreamFailed = ex ⇒ testActor ! ex . getMessage ,
onStop = ( ) ⇒ testActor ! "stop-c" ) ) ) {
val msg = "Boom! Boom! Boom!"
upstream . onError ( TE ( msg ) )
expectMsg ( msg )
expectMsg ( "stop-c" )
expectNoMsg ( 300. millis )
}
2015-10-31 14:46:10 +01:00
"call postStop in order on stages - when downstream cancels" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PreStartAndPostStopIdentity ( onStop = ( ) ⇒ testActor ! "stop-a" ) ,
PreStartAndPostStopIdentity ( onStop = ( ) ⇒ testActor ! "stop-b" ) ,
PreStartAndPostStopIdentity ( onStop = ( ) ⇒ testActor ! "stop-c" ) ) ) {
downstream . cancel ( )
expectMsg ( "stop-c" )
expectMsg ( "stop-b" )
expectMsg ( "stop-a" )
expectNoMsg ( 300. millis )
}
2015-10-31 14:46:10 +01:00
"call preStart before postStop" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PreStartAndPostStopIdentity ( onStart = _ ⇒ testActor ! "start-a" , onStop = ( ) ⇒ testActor ! "stop-a" ) ) ) {
expectMsg ( "start-a" )
expectNoMsg ( 300. millis )
upstream . onComplete ( )
expectMsg ( "stop-a" )
expectNoMsg ( 300. millis )
}
2015-10-31 14:46:10 +01:00
"onError when preStart fails" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PreStartFailer ( ( ) ⇒ throw TE ( "Boom!" ) ) ) ) {
lastEvents ( ) should === ( Set ( Cancel , OnError ( TE ( "Boom!" ) ) ) )
}
2015-10-31 14:46:10 +01:00
"not blow up when postStop fails" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PostStopFailer ( ( ) ⇒ throw TE ( "Boom!" ) ) ) ) {
upstream . onComplete ( )
lastEvents ( ) should === ( Set ( OnComplete ) )
}
2015-10-31 14:46:10 +01:00
"onError when preStart fails with stages after" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
Map ( ( x : Int ) ⇒ x , stoppingDecider ) ,
PreStartFailer ( ( ) ⇒ throw TE ( "Boom!" ) ) ,
Map ( ( x : Int ) ⇒ x , stoppingDecider ) ) ) {
lastEvents ( ) should === ( Set ( Cancel , OnError ( TE ( "Boom!" ) ) ) )
}
2015-10-31 14:46:10 +01:00
"continue with stream shutdown when postStop fails" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
PostStopFailer ( ( ) ⇒ throw TE ( "Boom!" ) ) ) ) {
lastEvents ( ) should === ( Set ( ) )
upstream . onComplete ( )
lastEvents should === ( Set ( OnComplete ) )
}
2015-10-31 14:46:10 +01:00
"postStop when pushAndFinish called if upstream completes with pushAndFinish" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
new PushFinishStage ( onPostStop = ( ) ⇒ testActor ! "stop" ) ) ) {
lastEvents ( ) should be ( Set . empty )
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNextAndComplete ( "foo" )
lastEvents ( ) should be ( Set ( OnNext ( "foo" ) , OnComplete ) )
expectMsg ( "stop" )
}
2015-10-31 14:46:10 +01:00
"postStop when pushAndFinish called with pushAndFinish if indirect upstream completes with pushAndFinish" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
Map ( ( x : Any ) ⇒ x , stoppingDecider ) ,
new PushFinishStage ( onPostStop = ( ) ⇒ testActor ! "stop" ) ,
Map ( ( x : Any ) ⇒ x , stoppingDecider ) ) ) {
lastEvents ( ) should be ( Set . empty )
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNextAndComplete ( "foo" )
lastEvents ( ) should be ( Set ( OnNext ( "foo" ) , OnComplete ) )
expectMsg ( "stop" )
}
2015-10-31 14:46:10 +01:00
"postStop when pushAndFinish called with pushAndFinish if upstream completes with pushAndFinish and downstream immediately pulls" in new OneBoundedSetup [ String ] ( Seq (
2015-05-11 00:09:59 +02:00
new PushFinishStage ( onPostStop = ( ) ⇒ testActor ! "stop" ) ,
Fold ( "" , ( x : String , y : String ) ⇒ x + y , stoppingDecider ) ) ) {
lastEvents ( ) should be ( Set . empty )
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNextAndComplete ( "foo" )
lastEvents ( ) should be ( Set ( OnNext ( "foo" ) , OnComplete ) )
expectMsg ( "stop" )
}
}
2015-10-31 14:46:10 +01:00
private [ akka ] case class PreStartAndPostStopIdentity [ T ] (
onStart : LifecycleContext ⇒ Unit = _ ⇒ ( ) ,
onStop : ( ) ⇒ Unit = ( ) ⇒ ( ) ,
onUpstreamCompleted : ( ) ⇒ Unit = ( ) ⇒ ( ) ,
onUpstreamFailed : Throwable ⇒ Unit = ex ⇒ ( ) )
extends PushStage [ T , T ] {
override def preStart ( ctx : LifecycleContext ) = onStart ( ctx )
override def onPush ( elem : T , ctx : Context [ T ] ) = ctx . push ( elem )
override def onUpstreamFinish ( ctx : Context [ T ] ) : TerminationDirective = {
onUpstreamCompleted ( )
super . onUpstreamFinish ( ctx )
}
override def onUpstreamFailure ( cause : Throwable , ctx : Context [ T ] ) : TerminationDirective = {
onUpstreamFailed ( cause )
super . onUpstreamFailure ( cause , ctx )
}
override def postStop ( ) = onStop ( )
}
private [ akka ] case class PreStartFailer [ T ] ( pleaseThrow : ( ) ⇒ Unit ) extends PushStage [ T , T ] {
override def preStart ( ctx : LifecycleContext ) =
pleaseThrow ( )
override def onPush ( elem : T , ctx : Context [ T ] ) = ctx . push ( elem )
}
private [ akka ] case class PostStopFailer [ T ] ( ex : ( ) ⇒ Throwable ) extends PushStage [ T , T ] {
override def onUpstreamFinish ( ctx : Context [ T ] ) = ctx . finish ( )
override def onPush ( elem : T , ctx : Context [ T ] ) = ctx . push ( elem )
override def postStop ( ) : Unit = throw ex ( )
}
// This test is related to issue #17351
private [ akka ] class PushFinishStage ( onPostStop : ( ) ⇒ Unit = ( ) ⇒ ( ) ) extends PushStage [ Any , Any ] {
override def onPush ( elem : Any , ctx : Context [ Any ] ) : SyncDirective =
ctx . pushAndFinish ( elem )
override def onUpstreamFinish ( ctx : Context [ Any ] ) : TerminationDirective =
ctx . fail ( akka . stream . testkit . Utils . TE ( "Cannot happen" ) )
override def postStop ( ) : Unit =
onPostStop ( )
}
2015-05-11 00:09:59 +02:00
}