Pipeline
pipeline
E' il raccoglitore scritto in plain ruby che interroga le basi di dati per creare il file yaml che serve al correlatore.
Sostituisce 'stomp' un progetto in elixir con molto parallelismo.
come fare a valutare se l'output è lo stesso del sistema precedente?
Bisogna fare un percorso camel che prende il CF in ingresso, lo invia sia all'attuale raccoglitore (stomp) che a pipeline; l'attuale raccoglitore dopo la raccolta manda al correlatore che invia l'output sia al delta che al filesystem. Invece lo output di pipeline da al correlatore che manda l'esito solo al filesystem.
Deve esistere un delta che paragona i due output sul filesystem.
Dovremo avere una persistenza almeno temporanea degli yaml dei raccoglitori per le verifiche.
Bisogna che un servizio avvoltoio faccia pulizia dei file sul filesystem.
I punti sono:
- ciò che viene inviato alla destinazione queue/elixir deve essere duplicato su queue/pipeline; Questo si fa
modificando il elixir-stomp@.service di systemd
Environment=SUBSCRIBE=/queue/elixir.in #Environment=SUBSCRIBE=/queue/elixir
e aggiungendo una rotta camel:
<route id="main_entrance_cf.flow"> <from uri="hub:elixir"/> <multicast> <to uri="hub:elixir.in"/> <to uri="hub:pipeline.in"/> </multicast> </route>
- l'output di elixir equivalente allo output di pipeline deve venire salvato per reference sulla coda funnel.log,
oltre che inviato al correlatore:
<route id="funnel.flow"> <from uri="hub:funnel.out"/> <multicast> <to uri="hub:correla.to"/> <to uri="hub:funnel.log?timeToLive=3600000"/> </multicast> </route>
- l'output di pipeline deve essere ugualmente salvato per reference ed inoltre inviato ad un altro correlatore, 'dryrun':
<route id="pipeline.flow"> <from uri="hub:pipeline.out"/> <multicast> <to uri="hub:dryrun.correlable"/> <to uri="hub:pipeline.log?timeToLive=3600000"/> </multicast> </route>
- l'output di dryrun (dryrun.correlated) deve persistere su filesystem:
<route id="pipeline.output"> <from uri="hub:dryrun.correlated"/> <log message="pipeline has correlated: ${headers['JMSCorrelationID']}"/> <to uri="file://outputdir?fileName=/opt/compliance/pipeline-${in.header.JMSCorrelationID}-${date:now:yyyyMMdd-HHmm}.ldif" /> </route>
- inserire un passaggio tra la uscita di rcorrela (rcorrela.done) e delta per permettere la persistenza dello ldif su file:
<route id="rcorrela_to_delta"> <from uri="hub:correla.correlated"/> <log message="rcorrela has correlated: ${headers['JMSCorrelationID']}"/> <multicast> <to uri="file://opt/compliance?fileName=rcorrela-${in.header.JMSCorrelationID}-${date:now:yyyyMMdd-HHmm}.ldif" /> <to uri="hub:delta.in"/> </multicast> </route>
passare in produzione
- eliminare i demoni stomp like incluso funnel.
- guardare monit
- guardare la coda usernames.to
- rivedere gli script in timers che fanni ripassare i messaggi in DLQ (shovel) o li cancellano.
camel primer
Avere il log dei messaggi in passaggio:
<from uri="activemq:ping"/> <log message="Test route processing ${id}"/> <log message="Test route processing ${body}"/> <log message="Test route CorrelationID ${headers['JMSCorrelationID']}"/> <log message="Test route processing ${headers}"/> <to uri="activemq:pong"/>
Scrivere un file:
<multicast> <to uri="file://outputdir?fileName=ping-${in.header.JMSCorrelationID}-${date:now:yyyyMMdd-HHmm}.txt" /> <to uri="activemq:pong"/> </multicast>
Per il nome del file, guarda: https://camel.apache.org/components/3.20.x/languages/file-language.html
aggiungere il CorrelationID se manca
<route id="test"> <from uri="wutki64:ping"/> <log message="correlation ID found: ${header.JMSCorrelationID}"/> <choice> <when> <simple>${header.JMSCorrelationID.trim()} == ''</simple> <log message="correlation ID not found"/> <setHeader headerName="JMSCorrelationID"> <simple>${exchangeId}${random(9)}${random(9)}${random(9)$random(9)}</simple> </setHeader> <to uri="wutki64:pong"/> </when> <otherwise> <log message="correlation ID found: ${header.JMSCorrelationID}"/> <to uri="wutki64:pong"/> </otherwise> </choice> </route>
una rotta per spostare periodicamente i messaggi ad esempio da un DLQ alla coda base
<route> <from uri="gimcrack:ugov_retry"/> <setHeader headerName="AMQ_SCHEDULED_DELAY"> <constant>1200000</constant> </setHeader> <to uri="gimcrack:ugov_sync"/> </route>