2014-10-08 18:16:57 +02:00
/* *
* Copyright ( C ) 2009 - 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.impl.fusing
2016-01-20 10:00:37 +02:00
import akka.NotUsed
2015-10-31 14:46:10 +01:00
import akka.stream. { Attributes , Shape , Supervision }
import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.testkit.AkkaSpec
2015-02-04 09:26:32 +01:00
2015-10-31 14:46:10 +01:00
class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit {
2015-02-04 09:26:32 +01:00
import Supervision.stoppingDecider
2014-10-08 18:16:57 +02:00
val chainLength = 1000 * 1000
val halfLength = chainLength / 2
val repetition = 100
2015-10-31 14:46:10 +01:00
val f = ( x : Int ) ⇒ x + 1
val map : GraphStageWithMaterializedValue [ Shape , Any ] =
2016-01-20 10:00:37 +02:00
new PushPullGraphStage [ Int , Int , NotUsed ] ( ( _ ) ⇒ Map ( f , stoppingDecider ) , Attributes . none )
2015-10-31 14:46:10 +01:00
. asInstanceOf [ GraphStageWithMaterializedValue [ Shape , Any ] ]
2014-10-08 18:16:57 +02:00
"Interpreter" must {
2015-10-31 14:46:10 +01:00
"work with a massive chain of maps" in new OneBoundedSetup [ Int ] ( Array . fill ( chainLength ) ( map ) . asInstanceOf [ Array [ GraphStageWithMaterializedValue [ Shape , Any ] ] ] ) {
2014-10-08 18:16:57 +02:00
lastEvents ( ) should be ( Set . empty )
val tstamp = System . nanoTime ( )
var i = 0
while ( i < repetition ) {
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNext ( i )
lastEvents ( ) should be ( Set ( OnNext ( i + chainLength ) ) )
i += 1
}
upstream . onComplete ( )
lastEvents ( ) should be ( Set ( OnComplete ) )
val time = ( System . nanoTime ( ) - tstamp ) / ( 1000.0 * 1000.0 * 1000.0 )
2015-10-31 14:46:10 +01:00
// Not a real benchmark, just for sanity check
2014-10-08 18:16:57 +02:00
info ( s" Chain finished in $time seconds ${ ( chainLength * repetition ) / ( time * 1000 * 1000 ) } million maps/s " )
}
2015-10-31 14:46:10 +01:00
"work with a massive chain of maps with early complete" in new OneBoundedSetup [ Int ] ( Iterable . fill ( halfLength ) ( Map ( ( x : Int ) ⇒ x + 1 , stoppingDecider ) ) ++
2014-10-08 18:16:57 +02:00
Seq ( Take ( repetition / 2 ) ) ++
2015-02-04 09:26:32 +01:00
Seq . fill ( halfLength ) ( Map ( ( x : Int ) ⇒ x + 1 , stoppingDecider ) ) ) {
2014-10-08 18:16:57 +02:00
lastEvents ( ) should be ( Set . empty )
val tstamp = System . nanoTime ( )
var i = 0
while ( i < ( repetition / 2 ) - 1 ) {
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNext ( i )
lastEvents ( ) should be ( Set ( OnNext ( i + chainLength ) ) )
i += 1
}
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNext ( 0 )
lastEvents ( ) should be ( Set ( Cancel , OnComplete , OnNext ( 0 + chainLength ) ) )
val time = ( System . nanoTime ( ) - tstamp ) / ( 1000.0 * 1000.0 * 1000.0 )
2015-10-31 14:46:10 +01:00
// Not a real benchmark, just for sanity check
2014-10-08 18:16:57 +02:00
info ( s" Chain finished in $time seconds ${ ( chainLength * repetition ) / ( time * 1000 * 1000 ) } million maps/s " )
}
2015-10-31 14:46:10 +01:00
"work with a massive chain of takes" in new OneBoundedSetup [ Int ] ( Iterable . fill ( chainLength ) ( Take ( 1 ) ) ) {
2014-10-08 18:16:57 +02:00
lastEvents ( ) should be ( Set . empty )
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
upstream . onNext ( 0 )
lastEvents ( ) should be ( Set ( Cancel , OnNext ( 0 ) , OnComplete ) )
}
2015-10-31 14:46:10 +01:00
"work with a massive chain of drops" in new OneBoundedSetup [ Int ] ( Iterable . fill ( chainLength / 1000 ) ( Drop ( 1 ) ) ) {
2014-10-08 18:16:57 +02:00
lastEvents ( ) should be ( Set . empty )
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( RequestOne ) )
var i = 0
while ( i < ( chainLength / 1000 ) ) {
upstream . onNext ( 0 )
lastEvents ( ) should be ( Set ( RequestOne ) )
i += 1
}
upstream . onNext ( 0 )
lastEvents ( ) should be ( Set ( OnNext ( 0 ) ) )
}
2015-10-31 14:46:10 +01:00
"work with a massive chain of conflates by overflowing to the heap" in new OneBoundedSetup [ Int ] ( Iterable . fill ( 100000 ) ( Conflate (
2014-10-08 18:16:57 +02:00
( in : Int ) ⇒ in ,
2015-02-04 09:26:32 +01:00
( agg : Int , in : Int ) ⇒ agg + in ,
2015-10-31 14:46:10 +01:00
Supervision . stoppingDecider ) ) ) {
2014-10-08 18:16:57 +02:00
lastEvents ( ) should be ( Set ( RequestOne ) )
var i = 0
while ( i < repetition ) {
upstream . onNext ( 1 )
lastEvents ( ) should be ( Set ( RequestOne ) )
i += 1
}
downstream . requestOne ( )
lastEvents ( ) should be ( Set ( OnNext ( repetition ) ) )
}
}
}