Nie chodzi bynajmniej o moją druga połówkę lecz Camela, projekt typu lightweight ESB. W Camelu możemy zdefiniować ścieżki określające przepływ danych. Mało tego, możemy je po drodze agregować, skierować do różnych źródeł, skonwertować, procesować w dowolny sposób etc. Danymi mogą być pliki, encje, informacje z wielu źródeł typu kolejki JMS, bazy danych itd. Dokumentacja do Camela jest wyjątkowo rozbudowana i przejrzysta więc poprzestanę na tym prostym opisie a weźmiemy się za konkretny sample...


Załaduje proste dane z pliku CSV do bazy MySQL. Od początku wiedziałem, że Camel nic specjalnego tu nie pokaże, ale chodziło by sprawdzić że działa. Działa, ale z bólami. Wiem, że obecnie są problemy z alokacją pamięci przy ładowaniu naprawdę dużych plików, konfiguracja i przechwytywanie błedów nie jest tak elastyczne jak np. w MS Integration Services (ale IS to zupełnie inna bajka choćby ze względu na bulk inserty w Sql Server), ale nie takich błędów oczekiwałem.


Zdefiniowałem kontekst Camela wraz z ścieżką jak poniżej...

<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
 <xsl:output method="xml" />
 <xsl:template match="@* | node()">
   <xsl:copy>
     <xsl:apply-templates select="@* | node()"/>
   </xsl:copy>
 </xsl:template>
</xsl:stylesheet>

Flow ścieżki jest prosty, mam źródło w postaci folderu w którym znajdują się pliki csv, skanowanie folderu odbywa się co 5 sekund. Każdy plik będzie po przetworzeniu usunięty (tak naprawdę przeniesiony). W trakcie przetwarzania każda linia będzie czytana i komponentem 'bindy' konwertowana do obiektu CSV, a konkretniej encji oznaczonej odpowiednimi annotacjami w moim wypadku jest to klasa tomekkup.camelcsv.model.FooCsvBean. Komponent zwróci nam obiekt typu List<Map<FooCsvBean,FooCsvBean>>. Magiczne i to w tym miejscu jest właśnie issue. Kolejne komponenty nie rozumieją takiego formatu danych i nie mają żadnego konwertera który by sobie z tym stanem poradził. Zatem komponentem 'split' będę iterował elementy i procesował procesorem entryResolvingProcessor, który jest zarejestrowany w kontekście jako obiekt klasy jak poniżej...

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class CsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<Object, Object> body = (Map<Object, Object>) exchange.getIn().getBody();
exchange.getIn().setBody(body.values().iterator().next());
}
}

Procesor wyciąga z mapy elementy i podmienia jest we flow. W ten sposób kolejny komponent otrzyma obiekt który rozumie... kolejny czyli komponent JPA, który zapisze dane do bazy. No tak, brakuje jeszcze definicji klasy FooCsvBean ze wszystkimi mapowaniami !

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;

@CsvRecord(skipFirstLine = true, separator = ";", crlf="UNIX")
@Entity(name = "csvSourceBean")
public class CsvSourceBean {

@Id()
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

@Column(name = "code")
@DataField(pos = 1)
private String code;

@Column(name = "account")
@DataField(pos = 2)
private String account;

public String getCode() {
return code;
}

public void setCode(String code) {
this.code = code;
}

public String getAccount() {
return account;
}

public void setAccount(String account) {
this.account = account;
}

}