diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 3613b68b09..16341edbdf 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import akka.actor.{ ActorRef, Props } +import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ @@ -198,11 +199,13 @@ object Sink extends SinkApply { /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must * be [[akka.stream.actor.ActorSubscriber]]. */ - def actorSubscriber[T](props: Props): Sink[T, ActorRef] = + def actorSubscriber[T](props: Props): Sink[T, ActorRef] = { + require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber") new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink"))) + } /** * Creates a `Sink` that is materialized as an [[akka.stream.SinkQueue]]. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index f50964f817..fb034cbc40 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import akka.actor.{ ActorRef, Cancellable, Props } +import akka.stream.actor.ActorPublisher import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.GraphStages.TickSource @@ -285,11 +286,13 @@ object Source extends SourceApply { /** * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor - * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should + * created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must * be [[akka.stream.actor.ActorPublisher]]. */ - def actorPublisher[T](props: Props): Source[T, ActorRef] = + def actorPublisher[T](props: Props): Source[T, ActorRef] = { + require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher") new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) + } /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].