Camel

Da sia.
Vai alla navigazione Vai alla ricerca

Camel from scratch

java

sudo apt-get install sun-java6-jdk

maven

La versione di maven di squeeze e' la 2.2. Per avere la versione 3.*, scaricarla da pagina di download di maven, scompattarlo in /usr/local, creare un link simbolico alla cartella scompattata e chiamarlo maven, modificare /etc/profile per aggiungere maven al PATH:

if [ "`id -u`" -eq 0 ]; then
 PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
else
 PATH="/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/usr/local/maven/bin"
fi

camel

Per creare un progetto camel, lanciare:

mvn archetype:generate -DarchetypeGroupId=org.apache.camel.archetypes \\
-DarchetypeArtifactId=camel-archetype-java -DarchetypeVersion=2.8.0 \\
-DgroupId=it.unimore.cesia -Dversion=1.0-SNAPSHOT -DartifactId=mio_progetto

La versione di camel (2.8.0) cambia piuttosto spesso, quindi conviene dare un'occhiata al sito camel per la versione piu' recente.

Esistono anche gli archetipi per creare un progetto scala o xml.

Configurazione

I file di configurazione di camel sono tre (almeno):

  • pom.xml: per le dipendenze maven ed i plugin maven
  • src/main/resources/META-INF/spring/camel-context.xml: sono indicate le risorse usate dalle rotte
  • src/main/java/it/unimore/cesia/MyRouteBuilder.java: le rotte (ma questo file puo' essere rinominato).

Tutorial: un progetto camel con activemq e jetty

Esportare le proprieta' nel pom.xml

<properties>
   <camel-version>2.8.0</camel-version>
   <logback-version>0.9.29</logback-version>
   <activemq-version>5.5.0</activemq-version>
   <jruby-version>1.6.0</jruby-version>
 </properties>

e sostuire tutte le occorrenze del numero di versione di camel con:

<version>${camel-version}</version>

activemq

Dopo aver installato activemq, creare il file: src/main/resources/credential.properties dal contenuto:

activemq.username=system
activemq.password=manager

Aggiungere al pom.xml la dipendenza:

<dependency>
     <groupId>org.apache.camel</groupId>
     <artifactId>camel-jms</artifactId>
     <version>${camel-version}</version>
   </dependency>

e aggiungere a src/main/resources/META-INF/spring/camel-context.xml i dati dell'activemq, ad esempio:

<bean id="credentials" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
   <property name='location'>
     <value>credential.properties</value>
   </property>
 </bean>
<bean id="local-activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
 <property name="transacted" value="true"/>     
 <property name="transactionManager" ref="txManager"/>
 <property name="userName" value="${activemq.username}"/>
 <property name="password" value="${activemq.password}"/>
</bean>
<bean id="txManager"             
     class="org.springframework.jms.connection.JmsTransactionManager">
   <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory"
     class="org.apache.activemq.ActiveMQConnectionFactory">
 <property name="brokerURL" value="failover://(tcp://localhost:61616)" />
 <property name="userName" value="${activemq.username}"/>
 <property name="password" value="${activemq.password}"/>
</bean>

In questo caso activemq e' racchiuso da un transaction manager.

Come test si puo' aggiungere una rotta che coinvolge delle code activemq, ad esempio:

from("local-activemq:queue:my_project").
 to("log:it.unimore.cesia.my_project").
 to("local-activemq:queue:mio_progetto");

Gestire i log con logback

Per cambiare il logging da log4j a logback, commentare da pom.xml tutta la sezione di dependency del logging (slf4j e log4j) e aggiungere:

   <dependency>
     <groupId>ch.qos.logback</groupId>
     <artifactId>logback-core</artifactId>
     <version>${logback-version}</version>
   </dependency>
   <dependency>
     <groupId>ch.qos.logback</groupId>
     <artifactId>logback-classic</artifactId>
     <version>${logback-version}</version>
   </dependency>

Per ${logback-version} aggiungere un

<logback-version>0.9.29</logback-version>

nella sezione properties.

Attivato logback, creare un file logback.xml in src/main/resources ad esempio:

<configuration scan="true" scanPeriod="30 seconds">
 <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
   <File>mio_progetto.log</File>
   <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
     <FileNamePattern>mio-progetto-%d{yyyy-MM-dd}.log</FileNamePattern>
     <maxHistory>30</maxHistory>
   </rollingPolicy>
   <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
     <Pattern>%date %msg%n</Pattern>
     <charset>UTF-8</charset>
   </encoder>
 </appender>
 <appender name="SYSLOG"
   class="ch.qos.logback.classic.net.SyslogAppender">
   <SyslogHost>localhost</SyslogHost>
   <Facility>DAEMON</Facility>
   <SuffixPattern>[%logger] %msg</SuffixPattern>
 </appender>
 <logger name="it.unimore.cesia.my_project" additivity="false">
   <level value="DEBUG" />
   <appender-ref ref="FILE" /> 
   <appender-ref ref="SYSLOG" />
 </logger>
 <logger name="it.unimore.cesia" additivity="false">
   <level value="INFO"/>
   <appender-ref ref="FILE" /> 
   <appender-ref ref="SYSLOG" />
 </logger>
 <root level="ERROR">
   <appender-ref ref="SYSLOG" />
 </root>
</configuration>

Si tratta delle definizione di un logging su file con rotazione giornaliera e cancellazione dopo 30 giorni ed invio dei log sul syslog.

bridging http/messaggistica con jetty

Con una route tipo:

	from("jetty://http://0.0.0.0:8009/mio_progetto").

multicast(). pipeline(). inOnly(). choice(). when(header("schifo"). isEqualTo(constant("true"))). to("local-activemq:queue:this_project"). otherwise(). to("local-activemq:queue:your_project"). end(). end(). to("log:it.unimore.cesia.my_project"). transform(constant("fatto")). end(); Si premette a un client http di mandare messaggi asincroni su activemq.

Il client si esegue con:

curl "http://localhost:8009/mio_progetto?test=1905&schifo=false"

e manda un messaggio nella coda your_project.

Per usare lo http, il Messange Exchange Pattern (MEP) deve essere InOut, mentre la messaggistica asincrona e' InOnly. Quindi bisogna costruire una risposta da mandare allo http con la direttiva transform.

Prima di mettere in funzione questa rotta aggiungere nel pom.xml la dipendenza:

<dependency>
     <groupId>org.apache.camel</groupId>
     <artifactId>camel-jetty</artifactId>
     <version>${camel-version}</version>
   </dependency>

Un processor in ruby

Per svolgere le funzioni piu' flessibili diventa necessario scrivere una classe java che impementa l'interfaccia processor. Il processor puo' accedere ai dati del messaggio e dell'exchange che sono disponibili in variabili predefinite.

I passi necessari per un processor in ruby sono:

  • modificare il namespace nel camel-context.xml per includere il name space 'script';
  • registrare il processor in camel-context.xml;
  • aggiungere la dipendenza per jruby in pom.xml;
  • scrivere il processor;
  • inserirlo in una route.

Aggiungere il namespace script e registrare il processor

Il preambolo beans nel file src/main/resource/META_INF/spring/camel-context.xml diventa:

 <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:lang="http://www.springframework.org/schema/lang"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
			   http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
			   http://www.springframework.org/schema/lang
			   http://www.springframework.org/schema/lang/spring-lang-2.5.xsd
			   http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

Questo permette di registrare un processor del tipo:

<lang:jruby id="rubyProcessor"
             script-interfaces="org.apache.camel.Processor"
             script-source="classpath:ruby/ruby_processor.rb">
</lang:jruby>

Si tratta di uno script che ha sorgente in src/main/resource/ruby/ruby_processor.rb che e' referenziato nelle rotte con il nome 'rubyProcessor' e che impementa l'interfaccia Processor.

Aggiungere la dipendenza per jruby in pom.xml

Aggiungere dentro le dependecies:

   <dependency>
      <groupId>org.jruby</groupId>
      <artifactId>jruby-complete</artifactId>
      <version>${jruby-version}</version>
   </dependency>

(jruby-version deve essere settata nelle properties, adesso e' 1.6.4)

Un processor di esempio

require 'java'
java_import "org.apache.camel.Exchange"
java_import "org.apache.camel.Processor"
java_import "org.slf4j.Logger"
java_import "org.slf4j.LoggerFactory"
java_import "ch.qos.logback.classic.LoggerContext"

java_package "it.unimore.cesia"

class RubyProcessor
  include Processor

  def initialize
    # crea un logger che si chiama come il nome file esclusa l'estersione .rb
    @logger = LoggerFactory.getLogger("it.unimore.cesia.#{File.basename(__FILE__, ".rb")}");
  end  

  java_signature "void process(Exchange exchange)"
  def process(exchange)
    schifo = exchange.in.headers['schifo']
    @logger.info("RubyProcessor: lo header schifo vale: #{schifo}")
    exchange.out.set_header("Issuer", "rubyProcessor")
  end
end

Questo file deve essere salvato come ruby_processor.rb in src/main/resource/ruby (creare la cartella ruby che non esiste).

rubyProcessor legge un header e ne setta un altro. Il metodo che viene invocato da camel e' process, quindi la logica del bean deve essere in questo metodo.

La variabile exchange contiene tutta la logica dello scambio. Exchange.in e' il messaggio in ingresso, Exchange.out quello in uscita (ma vedi anche il blog Camel and OSS. Gli header sono nell'array associativo exchange.in.headers, mentre il body e' in exchange.in.body.

Includere il processor in una rotta

Questo semplice processor che non svolge nessun compito di routing puo' essere aggiunto subito all'inizio della rotta nel file src/main/java/it/unimore/cesia/MyProject.java

from("jetty://http://0.0.0.0:8009/mio_progetto").
	    beanRef("rubyProcessor").
	    multicast().
	    pipeline().
	    inOnly().
	    choice().
	    when(header("schifo").
		 isEqualTo(constant("true"))).
	    to("local-activemq:queue:this_project").
	    otherwise().
	    to("local-activemq:queue:your_project").
	    end().
	    end().
	    to("log:it.unimore.cesia.my_project").
	    transform(constant("fatto")).
	    end();

Si puo' verificare che funziona perche' aggiunge una riga al log e nel messaggio finale compare un header aggiuntivo.

Dependency Injection

Il motivo per usare camel/spring e' la possibilita' di definire risorse nel file src/main/resource/META_INF/spring/camel-context.xml e collegarle a un certo bean (wiring) nello stesso file.

In questo modo:

  • le stesse risorse possono essere condivise da piu' bean;
  • un bean puo' cambiare una risorsa senza modifiche al codice.

Collegare una risorsa

Ad esempio per dare l'accesso ad un database, aggiungere al file src/main/resource/META_INF/spring/camel-context.xml

  <bean id="cesia" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@cia.dmz-int.unimo.it:1521:cesia"/>
    <property name="username" value="${cesia.username}"/>
    <property name="password" value="${cesia.password}"/>
  </bean>

Le credenziali devono essere scritte in src/main/resource/credential.properties

Aggiungere al pom.xml il jar per oracle. Trattandosi di un jar proprietario mvn non puo' scaricarlo da un repository, ma e' da installare mano:

procurarsi il file ojdbc14.jar e lanciare:

mvn install:install-file -DgroupId=com.oracle -DartifactId=ojdbc14 -Dversion=10.2.0.3.0 -Dpackaging=jar -Dfile=/homel/francesco/ojdbc14.jar fatto questo aggiungere alle dipendenze del pom.xml

 <dependency>
      <groupId>com.oracle</groupId>
      <artifactId>ojdbc14</artifactId>
      <version>10.2.0.3.0</version>
    </dependency>
    <dependency>
      <groupId>commons-dbcp</groupId>
      <artifactId>commons-dbcp</artifactId>
      <version>1.4</version>
    </dependency>
    <dependency>
      <groupId>commons-pool</groupId>
      <artifactId>commons-pool</artifactId>
      <version>1.5.5</version>
    </dependency>
    <dependency>
      <groupId>cglib</groupId>
      <artifactId>cglib-nodep</artifactId>
      <version>2.1_3</version>
    </dependency> 

(le ultime tre dipendenze servono per il connection pooling).

Inoltre, per usare il template spring-jdbc aggiungere nelle dependecies:

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>${org.springframework.version}</version>
    </dependency>

e nelle properties: <org.springframework.version>3.0.5.RELEASE</org.springframework.version>


Un bean che usa questa risorsa e' del tipo (file src/main/resource/META_INF/spring/camel-context.xml):

 <lang:jruby id="dbProcessor"
              script-interfaces="org.apache.camel.Processor"
              script-source="classpath:ruby/db_processor.rb">
    <lang:property name="cesia" ref="cesia" />
  </lang:jruby>

Il bean db_bean e' un bean che si aspetta il CF di un utente nel body, interroga la tabella pi_passwd per avere la username e la scrive in un header.

require 'java'

java_import "org.apache.camel.Exchange"
java_import "org.apache.camel.Processor"

java_import "org.slf4j.Logger"
java_import "org.slf4j.LoggerFactory"
java_import "ch.qos.logback.classic.LoggerContext"

java_import "javax.sql.DataSource"

java_import "org.springframework.jdbc.core.JdbcTemplate"

java_import "java.util.HashMap"

java_import "java.sql.ResultSet"
java_import "java.sql.SQLException"

java_package "it.unimore.cesia"

class DbProcessor
  include Processor

  def initialize
    @logger = LoggerFactory.getLogger("it.unimore.cesia.db_processor")
    @logger.info("Hello from DbProcessor")
  end  

#java_signature 'void setEpiReplica(DataSource cesia)'
  def setCesia(cesia)
    @jdbc_template = JdbcTemplate.new(cesia)
  end

#java_annotation 'Exchange'
  java_signature 'void process(Exchange exchange)'
  def process(exchange)
    cf = exchange.in.body

    sql = "select username from pi_passwd where codicefiscale='#{cf}'"
    username = query(sql)
    @logger.debug("DbProcessor: username: #{username}")
    exchange.out.setBody(cf)
    exchange.in.getHeaders.each do |key, value|
      exchange.out.setHeader(key, value)
    end
    exchange.out.setHeader('username', username)
  end


  private 

  def query(sql)
    query = @jdbc_template.queryForList(sql)
    username = ''
    query.each do |row|
      username = row['USERNAME']
    end
    username
  end

end

Questo bean e' testabile con la rotta:

from("local-activemq:queue:cf").
	    beanRef("dbProcessor").
	    to("local-activemq:queue:cf_e_username");

in src/main/resources/ruby/db_processor.rb