Remove akka-camel (#26768)
* Remove akka-camel This module has been deprecated since 2.5.0 and will now be removed in 2.6. If there is interest it can be moved to a separate community-maintained repo. * Add note to migration guide * Remove from allModules as well
This commit is contained in:
parent
39b344c508
commit
96c8ef4257
80 changed files with 12 additions and 6538 deletions
|
|
@ -1,73 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.camel
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
object Consumers {
|
||||
object Sample1 {
|
||||
//#Consumer1
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
class Consumer1 extends Consumer {
|
||||
def endpointUri = "file:data/input/actor"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => println("received %s".format(msg.bodyAs[String]))
|
||||
}
|
||||
}
|
||||
//#Consumer1
|
||||
}
|
||||
object Sample2 {
|
||||
//#Consumer2
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
class Consumer2 extends Consumer {
|
||||
def endpointUri = "jetty:http://localhost:8877/camel/default"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => sender() ! ("Hello %s".format(msg.bodyAs[String]))
|
||||
}
|
||||
}
|
||||
//#Consumer2
|
||||
}
|
||||
object Sample3 {
|
||||
//#Consumer3
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import akka.camel.Ack
|
||||
import akka.actor.Status.Failure
|
||||
|
||||
class Consumer3 extends Consumer {
|
||||
override def autoAck = false
|
||||
|
||||
def endpointUri = "jms:queue:test"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage =>
|
||||
sender() ! Ack
|
||||
// on success
|
||||
// ..
|
||||
val someException = new Exception("e1")
|
||||
// on failure
|
||||
sender() ! Failure(someException)
|
||||
}
|
||||
}
|
||||
//#Consumer3
|
||||
}
|
||||
object Sample4 {
|
||||
//#Consumer4
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class Consumer4 extends Consumer {
|
||||
def endpointUri = "jetty:http://localhost:8877/camel/default"
|
||||
override def replyTimeout = 500 millis
|
||||
def receive = {
|
||||
case msg: CamelMessage => sender() ! ("Hello %s".format(msg.bodyAs[String]))
|
||||
}
|
||||
}
|
||||
//#Consumer4
|
||||
}
|
||||
}
|
||||
|
|
@ -1,61 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.camel
|
||||
|
||||
import akka.camel.CamelMessage
|
||||
import akka.actor.Status.Failure
|
||||
|
||||
import language.existentials
|
||||
|
||||
object CustomRoute {
|
||||
object Sample1 {
|
||||
//#CustomRoute
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
||||
import akka.camel.{ CamelExtension, CamelMessage }
|
||||
import org.apache.camel.builder.RouteBuilder
|
||||
import akka.camel._
|
||||
class Responder extends Actor {
|
||||
def receive = {
|
||||
case msg: CamelMessage =>
|
||||
sender() ! (msg.mapBody { body: String =>
|
||||
"received %s".format(body)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
class CustomRouteBuilder(system: ActorSystem, responder: ActorRef) extends RouteBuilder {
|
||||
def configure: Unit = {
|
||||
from("jetty:http://localhost:8877/camel/custom").to(responder)
|
||||
}
|
||||
}
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val responder = system.actorOf(Props[Responder], name = "TestResponder")
|
||||
camel.context.addRoutes(new CustomRouteBuilder(system, responder))
|
||||
//#CustomRoute
|
||||
|
||||
}
|
||||
object Sample2 {
|
||||
//#ErrorThrowingConsumer
|
||||
import akka.camel.Consumer
|
||||
|
||||
import org.apache.camel.builder.Builder
|
||||
import org.apache.camel.model.RouteDefinition
|
||||
|
||||
class ErrorThrowingConsumer(override val endpointUri: String) extends Consumer {
|
||||
def receive = {
|
||||
case msg: CamelMessage => throw new Exception("error: %s".format(msg.body))
|
||||
}
|
||||
override def onRouteDefinition =
|
||||
(rd) => rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
||||
|
||||
final override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||
sender() ! Failure(reason)
|
||||
}
|
||||
}
|
||||
//#ErrorThrowingConsumer
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,111 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.camel
|
||||
|
||||
//#imports
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
import akka.camel.CamelExtension
|
||||
|
||||
import language.postfixOps
|
||||
import akka.util.Timeout
|
||||
//#imports
|
||||
|
||||
object Introduction {
|
||||
def foo(): Unit = {
|
||||
//#Consumer-mina
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
class MyEndpoint extends Consumer {
|
||||
def endpointUri = "mina2:tcp://localhost:6200?textline=true"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => { /* ... */ }
|
||||
case _ => { /* ... */ }
|
||||
}
|
||||
}
|
||||
|
||||
// start and expose actor via tcp
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
|
||||
val system = ActorSystem("some-system")
|
||||
val mina = system.actorOf(Props[MyEndpoint])
|
||||
//#Consumer-mina
|
||||
}
|
||||
def bar(): Unit = {
|
||||
//#Consumer
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
|
||||
class MyEndpoint extends Consumer {
|
||||
def endpointUri = "jetty:http://localhost:8877/example"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => { /* ... */ }
|
||||
case _ => { /* ... */ }
|
||||
}
|
||||
}
|
||||
//#Consumer
|
||||
}
|
||||
def baz(): Unit = {
|
||||
//#Producer
|
||||
import akka.actor.Actor
|
||||
import akka.camel.{ Oneway, Producer }
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
|
||||
class Orders extends Actor with Producer with Oneway {
|
||||
def endpointUri = "jms:queue:Orders"
|
||||
}
|
||||
|
||||
val sys = ActorSystem("some-system")
|
||||
val orders = sys.actorOf(Props[Orders])
|
||||
|
||||
orders ! <order amount="100" currency="PLN" itemId="12345"/>
|
||||
//#Producer
|
||||
}
|
||||
{
|
||||
//#CamelExtension
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val camelContext = camel.context
|
||||
val producerTemplate = camel.template
|
||||
|
||||
//#CamelExtension
|
||||
}
|
||||
{
|
||||
//#CamelExtensionAddComponent
|
||||
// import org.apache.activemq.camel.component.ActiveMQComponent
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val camelContext = camel.context
|
||||
// camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(
|
||||
// "vm://localhost?broker.persistent=false"))
|
||||
//#CamelExtensionAddComponent
|
||||
}
|
||||
{
|
||||
//#CamelActivation
|
||||
import akka.camel.{ CamelMessage, Consumer }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class MyEndpoint extends Consumer {
|
||||
def endpointUri = "mina2:tcp://localhost:6200?textline=true"
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => { /* ... */ }
|
||||
case _ => { /* ... */ }
|
||||
}
|
||||
}
|
||||
val system = ActorSystem("some-system")
|
||||
val camel = CamelExtension(system)
|
||||
val actorRef = system.actorOf(Props[MyEndpoint])
|
||||
// get a future reference to the activation of the endpoint of the Consumer Actor
|
||||
val activationFuture = camel.activationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher)
|
||||
//#CamelActivation
|
||||
//#CamelDeactivation
|
||||
system.stop(actorRef)
|
||||
// get a future reference to the deactivation of the endpoint of the Consumer Actor
|
||||
val deactivationFuture = camel.deactivationFutureFor(actorRef)(timeout = 10 seconds, executor = system.dispatcher)
|
||||
//#CamelDeactivation
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,132 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.camel
|
||||
|
||||
import akka.camel.CamelExtension
|
||||
import language.postfixOps
|
||||
|
||||
object Producers {
|
||||
object Sample1 {
|
||||
//#Producer1
|
||||
import akka.actor.Actor
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
import akka.camel.{ CamelMessage, Producer }
|
||||
import akka.util.Timeout
|
||||
|
||||
class Producer1 extends Actor with Producer {
|
||||
def endpointUri = "http://localhost:8080/news"
|
||||
}
|
||||
//#Producer1
|
||||
//#AskProducer
|
||||
import akka.pattern.ask
|
||||
import scala.concurrent.duration._
|
||||
implicit val timeout = Timeout(10 seconds)
|
||||
|
||||
val system = ActorSystem("some-system")
|
||||
val producer = system.actorOf(Props[Producer1])
|
||||
val future = producer.ask("some request").mapTo[CamelMessage]
|
||||
//#AskProducer
|
||||
}
|
||||
object Sample2 {
|
||||
//#RouteResponse
|
||||
import akka.actor.{ Actor, ActorRef }
|
||||
import akka.camel.{ CamelMessage, Producer }
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
|
||||
class ResponseReceiver extends Actor {
|
||||
def receive = {
|
||||
case msg: CamelMessage =>
|
||||
// do something with the forwarded response
|
||||
}
|
||||
}
|
||||
|
||||
class Forwarder(uri: String, target: ActorRef) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
|
||||
override def routeResponse(msg: Any): Unit = { target.forward(msg) }
|
||||
}
|
||||
val system = ActorSystem("some-system")
|
||||
val receiver = system.actorOf(Props[ResponseReceiver])
|
||||
val forwardResponse = system.actorOf(Props(classOf[Forwarder], this, "http://localhost:8080/news/akka", receiver))
|
||||
// the Forwarder sends out a request to the web page and forwards the response to
|
||||
// the ResponseReceiver
|
||||
forwardResponse ! "some request"
|
||||
//#RouteResponse
|
||||
}
|
||||
object Sample3 {
|
||||
//#TransformOutgoingMessage
|
||||
import akka.actor.Actor
|
||||
import akka.camel.{ CamelMessage, Producer }
|
||||
|
||||
class Transformer(uri: String) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
|
||||
def upperCase(msg: CamelMessage) = msg.mapBody { body: String =>
|
||||
body.toUpperCase
|
||||
}
|
||||
|
||||
override def transformOutgoingMessage(msg: Any) = msg match {
|
||||
case msg: CamelMessage => upperCase(msg)
|
||||
}
|
||||
}
|
||||
//#TransformOutgoingMessage
|
||||
}
|
||||
object Sample4 {
|
||||
//#Oneway
|
||||
import akka.actor.{ Actor, ActorSystem, Props }
|
||||
import akka.camel.Producer
|
||||
|
||||
class OnewaySender(uri: String) extends Actor with Producer {
|
||||
def endpointUri = uri
|
||||
override def oneway: Boolean = true
|
||||
}
|
||||
|
||||
val system = ActorSystem("some-system")
|
||||
val producer = system.actorOf(Props(classOf[OnewaySender], this, "activemq:FOO.BAR"))
|
||||
producer ! "Some message"
|
||||
//#Oneway
|
||||
|
||||
}
|
||||
object Sample5 {
|
||||
//#Correlate
|
||||
import akka.camel.{ CamelMessage, Producer }
|
||||
import akka.actor.Actor
|
||||
import akka.actor.{ ActorSystem, Props }
|
||||
|
||||
class Producer2 extends Actor with Producer {
|
||||
def endpointUri = "activemq:FOO.BAR"
|
||||
}
|
||||
val system = ActorSystem("some-system")
|
||||
val producer = system.actorOf(Props[Producer2])
|
||||
|
||||
producer ! CamelMessage("bar", Map(CamelMessage.MessageExchangeId -> "123"))
|
||||
//#Correlate
|
||||
}
|
||||
object Sample6 {
|
||||
//#ProducerTemplate
|
||||
import akka.actor.Actor
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case msg =>
|
||||
val template = CamelExtension(context.system).template
|
||||
template.sendBody("direct:news", msg)
|
||||
}
|
||||
}
|
||||
//#ProducerTemplate
|
||||
}
|
||||
object Sample7 {
|
||||
//#RequestProducerTemplate
|
||||
import akka.actor.Actor
|
||||
class MyActor extends Actor {
|
||||
def receive = {
|
||||
case msg =>
|
||||
val template = CamelExtension(context.system).template
|
||||
sender() ! template.requestBody("direct:news", msg)
|
||||
}
|
||||
}
|
||||
//#RequestProducerTemplate
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,49 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.camel
|
||||
|
||||
object PublishSubscribe {
|
||||
//#PubSub
|
||||
import akka.actor.{ Actor, ActorRef, ActorSystem, Props }
|
||||
import akka.camel.{ CamelMessage, Consumer, Producer }
|
||||
|
||||
class Subscriber(name: String, uri: String) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => println("%s received: %s".format(name, msg.body))
|
||||
}
|
||||
}
|
||||
|
||||
class Publisher(name: String, uri: String) extends Actor with Producer {
|
||||
|
||||
def endpointUri = uri
|
||||
|
||||
// one-way communication with JMS
|
||||
override def oneway = true
|
||||
}
|
||||
|
||||
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
|
||||
def receive = {
|
||||
case msg: CamelMessage => {
|
||||
publisher ! msg.bodyAs[String]
|
||||
sender() ! ("message published")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add below to a Boot class
|
||||
// Setup publish/subscribe example
|
||||
val system = ActorSystem("some-system")
|
||||
val jmsUri = "jms:topic:test"
|
||||
val jmsSubscriber1 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-1", jmsUri))
|
||||
val jmsSubscriber2 = system.actorOf(Props(classOf[Subscriber], "jms-subscriber-2", jmsUri))
|
||||
val jmsPublisher = system.actorOf(Props(classOf[Publisher], "jms-publisher", jmsUri))
|
||||
val jmsPublisherBridge =
|
||||
system.actorOf(Props(classOf[PublisherBridge], "jetty:http://0.0.0.0:8877/camel/pub/jms", jmsPublisher))
|
||||
//#PubSub
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue