=str #18133 Sink.actorSubscriber() should verify the props

This commit is contained in:
Alexander Golubev 2015-10-09 15:11:01 -04:00
parent 58e59d6943
commit 53d0627675
2 changed files with 10 additions and 4 deletions

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.{ ActorRef, Props } import akka.actor.{ ActorRef, Props }
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._ import akka.stream.impl._
@ -198,11 +199,13 @@ object Sink extends SinkApply {
/** /**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorSubscriber]]. * be [[akka.stream.actor.ActorSubscriber]].
*/ */
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink"))) new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
}
/** /**
* Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]]. * Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]].

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.{ ActorRef, Cancellable, Props } import akka.actor.{ ActorRef, Cancellable, Props }
import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule } import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.TickSource import akka.stream.impl.fusing.GraphStages.TickSource
@ -285,11 +286,13 @@ object Source extends SourceApply {
/** /**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorPublisher]]. * be [[akka.stream.actor.ActorPublisher]].
*/ */
def actorPublisher[T](props: Props): Source[T, ActorRef] = def actorPublisher[T](props: Props): Source[T, ActorRef] = {
require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher")
new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
}
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].