Camel

20 czerwiec, 2011

Nie chodzi bynajmniej o moją druga połówkę lecz Camela, projekt typu lightweight ESB. W Camelu możemy zdefiniować ścieżki określające przepływ danych. Mało tego, możemy je po drodze agregować, skierować do różnych źródeł, skonwertować, procesować w dowolny sposób etc. Danymi mogą być pliki, encje, informacje z wielu źródeł typu kolejki JMS, bazy danych itd. Dokumentacja do Camela jest wyjątkowo rozbudowana i przejrzysta więc poprzestanę na tym prostym opisie a weźmiemy się za konkretny sample...


Załaduje proste dane z pliku CSV do bazy MySQL. Od początku wiedziałem, że Camel nic specjalnego tu nie pokaże, ale chodziło by sprawdzić że działa. Działa, ale z bólami. Wiem, że obecnie są problemy z alokacją pamięci przy ładowaniu naprawdę dużych plików, konfiguracja i przechwytywanie błedów nie jest tak elastyczne jak np. w MS Integration Services (ale IS to zupełnie inna bajka choćby ze względu na bulk inserty w Sql Server), ale nie takich błędów oczekiwałem.


Zdefiniowałem kontekst Camela wraz z ścieżką jak poniżej...

<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
 <xsl:output method="xml" />
 <xsl:template match="@* | node()">
   <xsl:copy>
     <xsl:apply-templates select="@* | node()"/>
   </xsl:copy>
 </xsl:template>
</xsl:stylesheet>

Flow ścieżki jest prosty, mam źródło w postaci folderu w którym znajdują się pliki csv, skanowanie folderu odbywa się co 5 sekund. Każdy plik będzie po przetworzeniu usunięty (tak naprawdę przeniesiony). W trakcie przetwarzania każda linia będzie czytana i komponentem 'bindy' konwertowana do obiektu CSV, a konkretniej encji oznaczonej odpowiednimi annotacjami w moim wypadku jest to klasa tomekkup.camelcsv.model.FooCsvBean. Komponent zwróci nam obiekt typu List<Map<FooCsvBean,FooCsvBean>>. Magiczne i to w tym miejscu jest właśnie issue. Kolejne komponenty nie rozumieją takiego formatu danych i nie mają żadnego konwertera który by sobie z tym stanem poradził. Zatem komponentem 'split' będę iterował elementy i procesował procesorem entryResolvingProcessor, który jest zarejestrowany w kontekście jako obiekt klasy jak poniżej...

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class CsvProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<Object, Object> body = (Map<Object, Object>) exchange.getIn().getBody();
exchange.getIn().setBody(body.values().iterator().next());
}
}

Procesor wyciąga z mapy elementy i podmienia jest we flow. W ten sposób kolejny komponent otrzyma obiekt który rozumie... kolejny czyli komponent JPA, który zapisze dane do bazy. No tak, brakuje jeszcze definicji klasy FooCsvBean ze wszystkimi mapowaniami !

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;

@CsvRecord(skipFirstLine = true, separator = ";", crlf="UNIX")
@Entity(name = "csvSourceBean")
public class CsvSourceBean {

@Id()
@GeneratedValue(strategy = GenerationType.AUTO)
private long id;

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

@Column(name = "code")
@DataField(pos = 1)
private String code;

@Column(name = "account")
@DataField(pos = 2)
private String account;

public String getCode() {
return code;
}

public void setCode(String code) {
this.code = code;
}

public String getAccount() {
return account;
}

public void setAccount(String account) {
this.account = account;
}

}

Encje 'plikowe' i Hibernate

20 październik, 2010

Jakiś czas temu, chciałem sprawić by zapis uploadowanych plików był procesem automatycznym. Czyli jak zwykle... zrób więcej, pisząc mniej kodu.. ahhh yeah. Udało się. Wszystko odbywa się za pomocą odpowiedniej encji, która przechowuje MultipartFile, oraz EventListener-ów w Hibernate. Definiujemy więc interfejs, który będzie identyfikował encje plikowe, musi być on dostępny na poziomie backingObject formularzy uploadujących pliki.

import java.io.IOException;
import java.io.InputStream;

import org.springframework.web.multipart.MultipartFile;

public interface FileResourceBean {
  InputStream getInputStream() throws IOException;

  MultipartFile getMultipartFile();

  void setMultipartFile(MultipartFile multipartfile);
}

Teraz skonfigurujemy eventListener, który będzie reagował na odpowiednie zdarzenia. Jak widać chcę także obsłużyć usuwanie encji z bazy, więc cała funkcjonalność działa w obie strony :)

<bean id="sessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean" p:schemaUpdate="true" p:dataSource-ref="dataSource">
  <!-- ... -->
  <property name="eventListeners">
    <map>
      <entry key="post-insert" value-ref="fileStateSyncInterceptor" />
      <entry key="save-update" value-ref="fileStateSyncInterceptor" />
      <entry key="post-delete" value-ref="fileStateSyncInterceptor" />
    </map>
  </property>
</bean>

Pora na najważniejsze. Implementacja EventListener-a, obsługuje zdarzenia post-insert, save-update oraz post-delete. Korzysta z ResourceManager-a (który nie zostanie tutaj opisany). Problemem niewątpliwie bolesnym, jest brak obsługi transakcji plikowej.

package com.gigacube.storage.event;

import java.io.IOException;

import org.apache.log4j.Logger;
import org.hibernate.event.PostDeleteEvent;
import org.hibernate.event.PostDeleteEventListener;
import org.hibernate.event.PostInsertEvent;
import org.hibernate.event.PostInsertEventListener;
import org.hibernate.event.SaveOrUpdateEvent;
import org.hibernate.event.def.DefaultSaveOrUpdateEventListener;
import org.springframework.beans.factory.InitializingBean;

import com.gigacube.storage.IdentifiedFileResourceBean;
import com.gigacube.storage.manager.ResourceManagerFactory;
import com.gigacube.util.Assertions;

public class FileStateSyncInterceptor extends DefaultSaveOrUpdateEventListener implements
PostDeleteEventListener, PostInsertEventListener, InitializingBean {
 private static final Logger logger = Logger.getLogger(FileStateSyncInterceptor.class);
 private static final long serialVersionUID = -4354252351354429127 L;

 private ResourceManagerFactory resourceManagerFactory;

 public void afterPropertiesSet() throws Exception {
  Assertions.notNull(resourceManagerFactory, "resourceManagerFactory");
 }

 public void onPostDelete(PostDeleteEvent event) {
  if (IdentifiedFileResourceBean.class.isAssignableFrom(event.getEntity().getClass())) {
   if (logger.isDebugEnabled()) {
    logger.debug("onPostDelete " + event.getEntity().getClass() + " with id " + event.getId());
   }
   resourceManagerFactory.delete((IdentifiedFileResourceBean) event.getEntity());
  }
 }

 public void onPostInsert(PostInsertEvent event) {
  if (IdentifiedFileResourceBean.class.isAssignableFrom(event.getEntity().getClass())) {
   if (logger.isDebugEnabled()) {
    logger.debug("onPostInsert " + event.getEntity().getClass() + " with id " + event.getId());
   }
   try {
    resourceManagerFactory.save((IdentifiedFileResourceBean) event.getEntity());
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
 }

 @Override
 public void onSaveOrUpdate(SaveOrUpdateEvent event) {
  if (IdentifiedFileResourceBean.class.isAssignableFrom(event.getObject().getClass())) {
   resourceManagerFactory.assignExtraIdentifier((IdentifiedFileResourceBean) event.getObject());
  }
  super.onSaveOrUpdate(event);
 }

 public void setResourceManagerFactory(ResourceManagerFactory resourceManagerFactory) {
  this.resourceManagerFactory = resourceManagerFactory;
 }
}


Łatwo zauważyć, że gdyby zamiast zdarzenia PostInsert obsłużyć PreInsert, wówczas mógłbym wycofać transakcję na bazie gdyby nie powiódł się zapis pliku na dysku, jednak ze względu na mapowanie plików do nazw, często potrzebuje w nazwie pliku umieścić identyfikator encji, a jest on dostępny dopiero w zdarzeniu PostInsert :( Inna kwestia to usability takiego rozwiązania, bowiem cały kod nadaje się tylko do śmietnika, gdy zastosujemy inne podejście i zastosujemy rozwiązania klasy BigTable lub usługi dostępne w Amazon AWS.

Xuggle + H.264

12 październik, 2010

Ci którzy próbowali lub będą próbować za pomocą biblioteki Xuggle enkodować video do formatu H.264 napotkają raczej na wyjątkowy wyjątek w stylu:

java.lang.RuntimeException: could not open output encoder for stream: 0

Debugując natrafią po drodze na komunikaty serwowane z JNI o tym, że należy zrobić jakiś tajemniczy preset. Jasne ! Otóż FFMPEG wymaga by korzystając z kodeka H.264 podać dodatkowe parametry. Parametry te znajdują się w zasadzie gotowych plikach, które wystarczy podać jako pewien parametr i voila. Ale konkretnie... wielu z nas posiłkowało się dostępnym w repozytorium programem Converter. Jako, że jest to jedno wielkie skupisko antywzorów, zrobiłem 'drobna zmiana' (jak ktoś kiedyś mawiał :D ) i finalnie mam sobie klase inkubatora ustawień codeca wideo etc:

package com.gigaessentials.video.provider;
import com.gigacube.util.Assertions; import com.xuggle.xuggler.ICodec;
public final class H264OptionsProvider extends AbstractVideoOptionsProvider { public H264OptionsProvider(String presetFile) { this.codec = ICodec.findEncodingCodec(ICodec.ID.CODEC_ID_H264); setBitrateTolerance(12000000); setPreset(presetFile); }
public void afterPropertiesSet() { super.afterPropertiesSet(); Assertions.hasText(preset, "preset"); }
public String getOutputContainerFormat() { return "mov"; } }

Ustawiam property preset podając ścieżkę do pliku konfiguracji codeca. Przy standardowej konfiguracji Xuggle-a, pliki te są w folderze C:\Program Files\Xuggle\shared\ffmpeg . Ja wybrałem po prostu plik libx264-default.ffpreset.

coder=1
flags=+loop
cmp=+chroma
partitions=+parti8x8+parti4x4+partp8x8+partb8x8
me_method=hex
subq=7
me_range=16
g=250
keyint_min=25
sc_threshold=40
i_qfactor=0.71
b_strategy=1
qcomp=0.6
qmin=10
qmax=51
qdiff=4
bf=3
refs=3
directpred=1
trellis=1
flags2=+mixed_refs+wpred+dct8x8+fastpskip+mbtree
wpredp=2
Nowsze wpisy → Home ← Starsze wpisy