=str #15892 Fix ClassCastException in ThunkSource

* and add test
This commit is contained in:
Patrik Nordwall 2014-09-29 12:19:28 +02:00
parent 386c3ba815
commit 66f86b6e9b
4 changed files with 84 additions and 22 deletions

View file

@ -105,15 +105,15 @@ class FlowIterableSpec extends AkkaSpec {
c.expectComplete()
}
"produce elements with two transformation steps" ignore {
// val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher()
// val c = StreamTestKit.SubscriberProbe[Int]()
// p.subscribe(c)
// val sub = c.expectSubscription()
// sub.request(10)
// c.expectNext(4)
// c.expectNext(8)
// c.expectComplete()
"produce elements with two transformation steps" in {
val p = FlowFrom(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(10)
c.expectNext(4)
c.expectNext(8)
c.expectComplete()
}
"allow cancel before receiving all elements" in {

View file

@ -106,16 +106,15 @@ class FlowIteratorSpec extends AkkaSpec {
c.expectComplete()
}
// FIXME enable test when filter is implemented
"produce elements with two transformation steps" ignore {
// val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher()
// val c = StreamTestKit.SubscriberProbe[Int]()
// p.subscribe(c)
// val sub = c.expectSubscription()
// sub.request(10)
// c.expectNext(4)
// c.expectNext(8)
// c.expectComplete()
"produce elements with two transformation steps" in {
val p = FlowFrom(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(10)
c.expectNext(4)
c.expectNext(8)
c.expectComplete()
}
"allow cancel before receiving all elements" in {

View file

@ -0,0 +1,63 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.concurrent.duration._
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.OnComplete
import akka.stream.testkit.StreamTestKit.OnError
import akka.stream.testkit.StreamTestKit.OnNext
class FlowThunkSpec extends AkkaSpec {
implicit val materializer = FlowMaterializer()
"A Flow based on a thunk generator" must {
"produce elements" in {
val iter = List(1, 2, 3).iterator
val p = FlowFrom(() if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectNext(11)
c.expectNoMsg(100.millis)
sub.request(3)
c.expectNext(12)
c.expectNext(13)
c.expectComplete()
}
"complete empty" in {
val p = FlowFrom(() None).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectComplete()
c.expectNoMsg(100.millis)
}
"allow cancel before receiving all elements" in {
val count = 100000
val iter = (1 to count).iterator
val p = FlowFrom(() if (iter.hasNext) Some(iter.next()) else None).toPublisher()
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -11,7 +11,7 @@ import scala.util.{ Failure, Success }
import org.reactivestreams.{ Publisher, Subscriber }
import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher }
import akka.stream.impl.{ ActorPublisher, EmptyPublisher, ErrorPublisher, FuturePublisher, IterablePublisher, IteratorPublisher, SimpleCallbackPublisher, TickPublisher, Stop }
import akka.stream.impl2.ActorBasedFlowMaterializer
object FlowFrom {
@ -230,8 +230,8 @@ final case class ThunkSource[In](f: () ⇒ Option[In]) extends SimpleSource[In]
create(materializer, flowName).subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[In] =
ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f),
name = s"$flowName-0-thunk"))
ActorPublisher[In](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings,
() f().getOrElse(throw Stop)), name = s"$flowName-0-thunk"))
}
/**