Camel
Nie chodzi bynajmniej o moją druga połówkę lecz Camela, projekt typu lightweight ESB. W Camelu możemy zdefiniować ścieżki określające przepływ danych. Mało tego, możemy je po drodze agregować, skierować do różnych źródeł, skonwertować, procesować w dowolny sposób etc. Danymi mogą być pliki, encje, informacje z wielu źródeł typu kolejki JMS, bazy danych itd. Dokumentacja do Camela jest wyjątkowo rozbudowana i przejrzysta więc poprzestanę na tym prostym opisie a weźmiemy się za konkretny sample...
Załaduje proste dane z pliku CSV do bazy MySQL. Od początku wiedziałem, że Camel nic specjalnego tu nie pokaże, ale chodziło by sprawdzić że działa. Działa, ale z bólami. Wiem, że obecnie są problemy z alokacją pamięci przy ładowaniu naprawdę dużych plików, konfiguracja i przechwytywanie błedów nie jest tak elastyczne jak np. w MS Integration Services (ale IS to zupełnie inna bajka choćby ze względu na bulk inserty w Sql Server), ale nie takich błędów oczekiwałem.
Zdefiniowałem kontekst Camela wraz z ścieżką jak poniżej...
<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output method="xml" />
<xsl:template match="@* | node()">
<xsl:copy>
<xsl:apply-templates select="@* | node()"/>
</xsl:copy>
</xsl:template>
</xsl:stylesheet>
Flow ścieżki jest prosty, mam źródło w postaci folderu w którym znajdują się pliki csv, skanowanie folderu odbywa się co 5 sekund. Każdy plik będzie po przetworzeniu usunięty (tak naprawdę przeniesiony). W trakcie przetwarzania każda linia będzie czytana i komponentem 'bindy' konwertowana do obiektu CSV, a konkretniej encji oznaczonej odpowiednimi annotacjami w moim wypadku jest to klasa tomekkup.camelcsv.model.FooCsvBean. Komponent zwróci nam obiekt typu List<Map<FooCsvBean,FooCsvBean>>. Magiczne i to w tym miejscu jest właśnie issue. Kolejne komponenty nie rozumieją takiego formatu danych i nie mają żadnego konwertera który by sobie z tym stanem poradził. Zatem komponentem 'split' będę iterował elementy i procesował procesorem entryResolvingProcessor, który jest zarejestrowany w kontekście jako obiekt klasy jak poniżej...
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class CsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<Object, Object> body = (Map<Object, Object>) exchange.getIn().getBody();
exchange.getIn().setBody(body.values().iterator().next());
}
}
Procesor wyciąga z mapy elementy i podmienia jest we flow. W ten sposób kolejny komponent otrzyma obiekt który rozumie... kolejny czyli komponent JPA, który zapisze dane do bazy. No tak, brakuje jeszcze definicji klasy FooCsvBean ze wszystkimi mapowaniami !
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;
@CsvRecord(skipFirstLine = true, separator = ";", crlf="UNIX")
@Entity(name = "csvSourceBean")
public class CsvSourceBean {
@Id()
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
@Column(name = "code")
@DataField(pos = 1)
private String code;
@Column(name = "account")
@DataField(pos = 2)
private String account;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
}