Apache Nifi – Sviluppare un processor custom

Lo sviluppo di un processor custom per Apache NiFi è molto semplice. Infatti i prerequisiti sono solo Maven e Java. È ovviamente necessario poi un ambiente su cui sia installato Apache NiFi per poter procedere ad eseguire il processors. Un ambiente completo e la Hortonworks Sandbox, nella versione per Virtualbox o per VMware, scaricabile qui; è anche possibile provare gratuitamente per un mese la versione disponibile sul Cloud Azure (al precedente link trovate anche le istruzioni).

Apache NiFi

Riportiamo nella Tabella 1 i prerequisiti per lo sviluppo con le relative versioni:

Tool Version
Java 1.8
Maven 3.3.9
Tabella 1: Prerequisiti per la compilazione del progetto

La struttura del progetto Maven può essere generata semplicemente digitando mvn archetype:generate. È sufficiente inserire Apache NiFi come filtro e quindi scegliere org.apache.nifi:nifi-processor-bundle-archetype. A questo punto, bisogna scegliere la versione di NiFi per cui sviluppare. Al momento attuale, la scelta corretta e la 0.6.1. In fine, basta completare i campi richiesti con le informazioni riguardanti il progetto Maven che si vuole creare (es. groupId e artifactId del progetto).

Il risultato e un’alberatura analoga alla seguente (nel nostro caso si e scelto come artifactId SentimentAnalyzer e come artifactBaseName sentiment, che sono da sostituire con i valori impostati nella fase precedente):

  • SentimentAnalyzer/pom.xml
  • SentimentAnalyzer/nifi-sentiment-nar
  • SentimentAnalyzer/nifi-sentiment-processors

Ciò che interessa direttamente lo sviluppatore e la cartella SentimentAnalyzer/nifi-sentiment-processors. In particolare, in questa cartella c’è un progetto Maven, che costituisce il vero e proprio processor. Di default, è creata una classe MyProcessor nel package scelto durante la fase di generazione dell’archetype. Se si vuole, come è consigliabile, dare un nome più signi ficativo alla classe, bisogna fare attenzione a una cosa: oltre a rinominare la classe Java, bisogna anche aprire il file src\main\resources\META-INF\services\org.apache.nifi.processor.Processor e aggiornare di conseguenza il nome della classe. Quest’ultimo file, infatti, contiene l’elenco delle classi processor che sono caricate da Apache NiFi. Quindi, se si vuole sviluppare più di un processor all’interno dello stesso progetto, bisogna inserire ogni classe in questo file, mettendone il full classpath name in una nuova riga.

Infi ne, non rimane altro che aprire e completare la classe MyProcessor (o con il nuovo nome assegnato) inserendo le opportune logiche custom. Nei casi semplici, ciò si traduce solo nel completamento del metodo onTrigger: questo è il metodo che viene chiamato ogni volta che il processor riceve un FlowFile. Per un approfondimento su cosa sia un FlowFile e sui concetti fondamentali di Apache NiFi si rimanda a questo articolo.

Le tipiche operazioni che si svolgono sono:

  • Lettura dei valori assegnati dall’utente alle proprietà del processor
  • Lettura del contenuto del FlowFile ricevuto
  • Lettura degli attributi associati al FlowFile
  • Modi ca del contenuto del FlowFile
  • Assegnazione di nuovi attributi al FlowFile
  • Routing del FlowFile generato come output delle operazioni eseguite verso le opportune relazioni (queue)

Lettura delle proprietà

La prima operazione e la lettura dei valori delle proprietà. Ma quali sono le proprietà a disposizione? Le proprietà sono de finite sempre all’interno di questa classe, tramite PropertyDescriptor. Un esempio è già presente nello schema autogenerato, per cui risulta semplice operare analogamente per defi nire tutte le proprietà necessarie al nostro scopo.

public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
 .Builder().name("My Property")
    .description("Example Property")
    .required(true)
    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    .build();

Esse vanno aggiunte nel metodo init ad un’apposita collection, che deve essere restituita dal metodo getSupportedPropertyDescriptors.

protected void init(final ProcessorInitializationContext context) {
    final List descriptors = new ArrayList();
    descriptors.add(MY_PROPERTY);
    this.descriptors = Collections.unmodifiableList(descriptors);

    final Set relationships = new HashSet();
    relationships.add(MY_RELATIONSHIP);
    this.relationships = Collections.unmodifiableSet(relationships);
}

Ottenere il valore impostato dall’utente a runtime e molto semplice: la classe ProcessContext – di cui si ha un’istanza come primo argomento del metodo onTrigger – espone il metodo getProperty, che restituisce un oggetto PropertyValue. Quest’ultimo espone il metodo getValue per ottenere il valore impostato.

Lettura del FlowFile di Apache NiFi

Un’altra operazione indispensabile è la lettura del FlowFile di Apache NiFi in input. A seconda del tipo di processor sviluppato, può essere necessario leggerne il contenuto o gli attributi oppure entrambi. I valori degli attributi sono molto semplici da leggere: la classe FlowFile ha un apposito metodo getAttribute che riceve la stringa contenente il nome dell’attributo da leggere e ne restituisce il valore.
Il contenuto del FlowFile, invece, può essere letto usando il metodo read dell’oggetto ProcessSession ricevuto come secondo parametro del metodo onTrigger. Il metodo riceve come parametri il FlowFile da leggere e una procedura di callback da eseguire sull’InputStream del contenuto del FlowFile.

Elaborazione

Solitamente un processor modi ca il contenuto o gli attributi del FlowFile. L’oggetto ProcessSession fornisce sia il metodo write per scrivere il contenuto sia i metodi putAttribute e putAllAttributes per aggiungere nuovi attributi ad un FlowFile. Quindi è possibile inserire la logica di elaborazione per salvarne i valori in nuovi elementi (attributi o FlowFile) o su elementi già esistenti.

Output

Infi ne, il FlowFile generato e inviato ad una relazione o queue. In questo modo, i processor connessi con questa relazione riceveranno gli opportuni FlowFile. Anche questa operazione – benchè fondamentale – è molto semplice: infatti, l’oggetto ProcessSession espone il metodo transfer che consente di speci ficare quale FlowFile inviare a quale relazione.

Add a Comment

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *