Tweet demo – Visualizzazione

Dopo aver indicizzato milioni di tweet relativi al mondo BigData sul motore Apache Solr, abbiamo necessità di analizzarli, aggregarli e visualizzarli su una web dashboard che dovrà essere integrata nel sito istituzionale.

I dati sono stati analizzati in prima istanza con Banana – lo strumento visuale di Solr – con il quale sono stati validati i tweet raccolti e aggregati secondo diverse tipologie, hashtags e altre caratteristiche rilevanti.  Questo tipo di analisi ci ha permesso, oltre ad ottimizzare i risultati, di individuare le query da eseguire sul motore per estrarre le seguenti informazioni:

  • Utenti più attivi
  • Hashtags più rilevanti
  • I clients e le piattaforme più utilizzate
  • Il paese di origine del tweet (o dell’utente)
  • La lingua dell’utente
  • Gli ultimi tweet (geolocalizzati e non)
  • Gli ultimi retweet
  • Il numero totale dei tweet e retweet indicizzati in Solr

Questi rappresenteranno i dati da presentare all’utente finale sulla web dashboard.

Tecnologie utilizzate

Apache Storm è un sistema open source di calcolo distribuito che consente di elaborare velocemente dati in real-time. Fa parte dell’ecosistema Hadoop ed è quindi scalabile; può interagire con tutte le altre componenti del sistema nonché con altri RPC, ETL, database, etc…

All’interno della nostra PoC, Storm è l’anello di congiunzione tra Solr e la Web Dashboard. Esso si occuperà di eseguire periodicamente le query sul motore di indicizzazione ed inviare il risultato json ad un socket UDP:

Flow Diagram

Il socket UDP è stato sviluppato come applicazione console. Considerato che il socket sarà eseguito sul server Windows che ospita il sito istituzionale, è stato scelto .NET come linguaggio di sviluppo. Il ruolo del socket sarà di recepire in input i diversi stream json inviati da Storm, applicare un parser per ogni tipologia (query) di dato e generare un file json persistente sul file system.

Dato che non abbiamo bisogno di controllare e gestire eventuali errori nella trasmissione dei dati, abbiamo scelto di utilizzare il protocollo UDP. Questa scelta permette anche di occupare meno banda di rete e di garantire una maggiore velocità di invio, e quindi generazione del file json.

La web dashboard è realizzata con HTML5/CSS3 e tecnologie javascript. La scelta di queste tecnologie ci ha permesso di suddividere il lavoro della UI con quello dell’application layer e di integrarlo semplicemente al sito istituzionale già esistente.

Storm

Storm è un sistema di computazione real-time tipicamente utilizzato per parallelizzare operazioni di stream processing, interrogazioni continue (continuos computation) oppure per interrogare e analizzare dati on the fly attraverso procedure distribuite (distributed RPC). In tutti i casi, la possibilità di lavorare con i sistemi Hadoop permette di utilizzarlo per una grande varietà di casi d’uso semplicemente utilizzando un insieme di primitive tipiche dei linguaggi di programmazione. Per sviluppare il nostro PoC abbiamo scelto Java.

A supporto di Storm esistono altri strumenti e componenti dell’ecosistema Hadoop come il “database” sql-like Hive o il file system distribuito HDFS. Nel nostro caso non abbiamo necessità di persistere i dati, già residenti su piattaforma Solr ma nulla toglie, con eventuali implementazioni, di salvare determinati dati su database o su file system durante l’elaborazione.

Da un punto di vista logico e architetturale, la computazione parallela e distribuita è svolta da diverse componenti, ognuna responsabile di un determinato task:

  • Lo stream di input viene gestito da uno SPOUT
  • Uno stream è un infinita sequenza di TUPLE, ossia una lista ordinata di oggetti. Lo spout legge lo stream in ingresso e incapsula i dati in tuple.
  • Le tuple vengono poi inviate ad uno o più BOLT che li processa ed elabora secondo quanto definito dallo sviluppatore. A sua volta ogni bolt può inviare i dati ad altri bolts oppure interagire con sistemi interni o esterni ad Hadoop.

L’insieme di tutti i componenti e delle loro connessioni è chiamato TOPOLOGY. Ogni topologia può essere eseguita localmente (LocalCluster) durante la fase di sviluppo oppure sottomessa in modalità distribuita.

File Structure

Dal punto di vista architetturale, un cluster Storm è organizzato in nodi master e worker: i primi eseguono un demone chiamato NIMBUS, mentre i worker eseguono un demone chiamato SUPERVISON. Una topologia viene eseguita attraverso i nodi worker su differenti macchine. L’interfaccia tra Nimbus e i vari supervisor, nonché il servizio che mantiene lo stato di tutte le informazioni, è invece ZOOKEEPER.

La topologia sviluppata per il caso d’uso specifico è caratterizzata da 11 spout e da un solo bolt.

Ogni spout è responsabile di eseguire una query verso Solr ogni 30 secondi (dato parametrizzato) ed inviare il risultato al bolt. A sua volta quest’ultimo apre una connessione verso il socket ed invia i dati ricevuti.

La nostra topologia può essere schematizzata nel seguente modo:

 

Storm Topology

Nei prossimi paragrafi analizziamo il codice sviluppato per gli spouts, bolt e la topologia.

ActiveUsersSpout.java

Consideriamo questa classe come esempio da analizzare, gli altri spout sono stati sviluppati in modo quasi identico.

Potevamo anche creare un unico spout per tutte le query ma questo ci avrebbe impedito di trattarli singolarmente. Se ad esempio decidiamo di non eseguire una query lo possiamo fare semplicemente non inizializzando lo spout relativo; se vogliamo assegnare un parallelismo maggiore ad alcune query rispetto ad altre perché richiedono maggiori prestazioni, in questo modo lo possiamo fare, con un solo spout no.

Dividere la topologia in tanti piccoli blocchi di esecuzioni (task) ci consente maggiore flessibilità per future implementazioni nonché una maggiore gestione ed interazione tra i task stessi.

Lo spout implementa la classe IRichSpout che mette a disposizione i metodi per aprire e chiudere una connessione verso la sorgente dati, gestire i tuple generati, gli eventi ack() e fail(), dichiarare e mappare lo stream che si sta gestendo.

Come prima cosa, dichiariamo il collector ossia l’oggetto principale che permette di emettere i dati e collegare i vari task.

A livello globale dichiariamo anche variabili che ci permetteranno di creare un collegamento verso Solr, la query da eseguire, il logger e la cache.

private SpoutOutputCollector collector;
private static Logger logger;
private String solrHostname;
private Long limit;
private Long sleepMs;
private Cache<Object, List<Object>> cache; // Map from Id to Values
private Boolean ResendFailMsg = false;

static String QUERY_API_URL = "/solr/twitterAll/select?q=*%3A*&df=id&wt=json&rows=0&facet=true&facet.field=screenName_s&facet.missing=false&indent=true&facet.limit=";

Soffermiamoci sul discorso cache. Storm implementa un sistema di fault tollerance con il quale è in grado di capire quando un messaggio emesso da un task è arrivato correttamente a destinazione (acked) oppure è fallito (failed). Possiamo sfruttare questa caratteristica per decidere cosa fare con i messaggi falliti, se provare a reinviarli oppure eliminarli dalla coda. Storm è comunque un sistema stateless, abbiamo quindi la necessità di implementare un sistema di caching che ci permetta di creare una coda interna dei messaggi da inviare e gestirli correttamente. E’ stata utilizzata la libreria di Google Core, Guava.

Nel metodo open() inizializziamo i nostri oggetti, tra cui la cache dove abbiamo assegnato un valore alto di numero di elementi e timeout di 60 secondi dall’ultimo accesso:

@Override

public void open(Map conf, TopologyContext tc, SpoutOutputCollector soc) {
logger = (Logger) LoggerFactory.getLogger(HashtagUsedSpout.class);
this.solrHostname = (String)conf.get("solrHostname");
this.limit = (Long)conf.get("platformUsedLimit");
this.sleepMs = (Long)conf.get("sleepInMs");
this.collector = soc;
cache = CacheBuilder.newBuilder()
.maximumSize(1000) // set maximum number of elements in the cache
.expireAfterAccess(60, TimeUnit.SECONDS) // set expiration strategy
.build();
}

La cache verrà ripulita quando lo spout verrà chiuso, ossia quando la topologia verrà “killata”:

@Override

public void close() {
// Clean cache
cache.cleanUp();
}

Il fulcro dello spout è rappresentato dal metodo nextTuple() che gestisce lo stream in input e lo invia al bolt tramite il connector.

In questo metodo creiamo una connessione http verso Solr attraverso la libreria OkHttpClient:

OkHttpClient client = new OkHttpClient();
// Set request
Request request = new Request.Builder()
.url(solrHostname+QUERY_API_URL+limit)
.build();

Leggiamo la risposta json:

// Get response
Response response = client.newCall(request).execute();
String json = response.body().string();

Salviamo in cache il messaggio da inviare:

// Cache handling
String id = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
// - cache the tuple and its id
Values values = new Values(json);
cache.put(id, values);

Inviamo il messaggio al bolt e chiudiamo la connessione:

collector.emit("streamActiveUsers", new Values(json), id);
// Close connection
response.body().close();

Dato che stiamo utilizzando un solo bolt per gestire diversi flussi, assegniamo ad ogni flusso un nome (in questo caso streamActiveUsers): questo ci permetterà di distinguere quale flusso stiamo gestendo all’interno del bolt e in futuro elaborarli in modo differente se necessario.

Nel metodo ack() andiamo ad eliminare dalla coda in cache il messaggio recepito dal bolt:

@Override

public void ack(Object msgId) {
// Message asked
System.out.println(msgId + "acked, so remove it...");
// Remove item from cache
cache.invalidate(msgId);
}

Nel metodo fail() invece possiamo decidere come gestire i messaggi non recapitati. Nel nostro caso li rigettiamo dato che 30 secondi dopo viene effettuata una nuova query. Se vogliamo reinviarli (o gestirli diversamente) basterà inizializzare a true la variabile ResendFailMsg:

@Override

public void fail(Object msgId) {
// Message failed
System.out.print(msgId + " failed!");
// Remove item from cache
if(cache.getIfPresent(msgId) == null){
System.out.println("\tBut id not found in cache..?");
}else{
if (ResendFailMsg) {
System.out.println("\tReplay: "+cache.getIfPresent(msgId));
collector.emit(cache.getIfPresent(msgId), msgId);
}
else
{
// Remove item from cache
System.out.println("Remove it from cache...");
cache.invalidate(msgId);
}
}
}

Come ultimo step, dobbiamo definire il formato del nostro output, ossia della sequenza di tuple, e il nome da assegnare ad esso nel metodo declareOutputFields(). Nel nostro caso si tratta di un json quindi si tratterà di una sequenza di stringhe:

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("streamActiveUsers", new Fields("jsonMessage"));
}

SocketBolt.java

La classe del bolt legge lo stream inviato dallo spout ed invia i dati ad un socket UDP. Implementa la classe di Storm IRichBolt della quale estenderemo i metodi prepare() ed execute().

Con il metodo prepare() andremo semplicemente ad inizializzare il collector e il logger:

@Override

public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
this.collector = oc;
logger = LoggerFactory.getLogger(SocketBolt.class);
}

Quando lo spout inviamo un messaggio, e quindi una sequenza di tuple, viene scatenato il metodo execute() che elabora i dati ricevuti. Leggiamo prima di tutto lo streamId per identificare lo spout che ci ha inviato il tuple:

String streamId = tuple.getSourceGlobalStreamid().get_streamId();

Questo valore al momento non viene utilizzato, servirà per successive implementazioni. Leggiamo il blocco json ricevuto da Solr come risposta alla query:

String jsonBlock = tuple.getString(0);

Creiamo quindi una connessione verso il socket UDP, attraverso la classe DatagramSocket, ed inviamo i dati json:

// Get bytes to send over socket

byte[] sendData = jsonBlock.getBytes();
// Init socket
DatagramSocket clientSocket = new DatagramSocket();
InetAddress IPAddress = InetAddress.getByName(socketIp);
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length, IPAddress, socketPort);
// Send data
clientSocket.send(sendPacket);
// Close socket
clientSocket.close();

Infine inviamo l’ack allo spout che ha generato il messaggio per informarlo dell’avvenuta ricezione:

collector.ack(tuple);

Nel caso vengano sollevate delle eccezioni, viene inviato un messaggio di failure e lo spout agirà come spiegato precedentemente.

StormTopology.java

Rappresenta la classe principale che inizializza ed esegue la topologia.

Questa estende la classe BaseConfig che si occupa di leggere il file di configurazione topology.properties la cui locazione fisica deve essere specificata come argomento del comando di esecuzione (vedere capitolo Run e deploy Storm).

Il metodo principale è buildAndSubmit()dove sono impostati i dati di configurazione che verranno poi letti dai singoli spout e bolt:

Config conf = new Config();
conf.put("solrHostname", solrConf.Protocol + "://" + solrConf.Host + ":" + solrConf.Port);
conf.put("sleepInMs", solrConf.SleepInMs);
conf.put("activeUsersLimit", solrConf.ActiveUsersLimit);
conf.put("hashtagUsedLimit", solrConf.HashtagUsedLimit);
conf.put("platformUsedLimit", solrConf.PlatformUsedLimit);
conf.put("clientUsedLimit", solrConf.ClientUsedLimit);
conf.put("latestTweetRows", solrConf.LatestTweetRows);
conf.put("languageUsedLimit", solrConf.LanguageUsedLimit);
conf.put("countryCodeLimit", solrConf.CountryCodeLimit);

Segue l’inizializzazione degli spout e del bolt, dove è stato applicato un parallelismo pari a 1 e un identificativo univoco:

// Build topology
TopologyBuilder builder = new TopologyBuilder();
// Init spouts
// - Active users
builder.setSpout("activeUsersSpout", new ActiveUsersSpout(),1);
// - Hashtag used
builder.setSpout("hashtagUsedSpout", new HashtagUsedSpout(),1);
// - Platform used
builder.setSpout("platformUsedSpout", new PlatformUsedSpout(),1);
// - Client used
builder.setSpout("clientUsedSpout", new ClientUsedSpout(),1);
// - Latest tweet
builder.setSpout("latestTweetSpout", new LatestTweetSpout(),1);
// - Latest geo tweet
builder.setSpout("latestGeoTweetSpout", new LatestGeoTweetSpout(),1);
// - Count tweet
builder.setSpout("countTweetSpout", new CountTweetSpout(),1);
// - Latest retweet
builder.setSpout("latestRetweetSpout", new LatestRetweetSpout(),1);
// - Count retweet
builder.setSpout("countRetweetSpout", new CountRetweetSpout(),1);
// - Country code
builder.setSpout("languageUsedSpout", new LanguageUsedSpout(),1);
// - Country code
builder.setSpout("countryCodeSpout", new CountryCodeSpout(),1);
// Init bolts
SocketBolt socketBolt = new SocketBolt();
socketBolt.socketIp = socketConfig.SocketIp;
socketBolt.socketPort = socketConfig.SocketPort;
builder.setBolt("socketBolt",socketBolt,1)
.shuffleGrouping("activeUsersSpout","streamActiveUsers")
.shuffleGrouping("hashtagUsedSpout", "streamHashtagUsed")
.shuffleGrouping("platformUsedSpout", "streamPlatformUsed")
.shuffleGrouping("clientUsedSpout", "streamClientUsed")
.shuffleGrouping("latestTweetSpout", "streamLatestTweet")
.shuffleGrouping("latestGeoTweetSpout", "streamLatestGeoTweet")
.shuffleGrouping("countTweetSpout", "streamCountTweet")
.shuffleGrouping("latestRetweetSpout", "streamLatestReTweet")
.shuffleGrouping("countRetweetSpout", "streamCountRetweet")
.shuffleGrouping("languageUsedSpout", "streamLanguageUsed")
.shuffleGrouping("countryCodeSpout", "streamCountryCode");

Come si può notare, il collegamento tra bolt e spouts avviene attraverso un grouping di tipo shuffle ossia ogni tupla emessa viene inviata al bolt in modalità casuale garantendo però che ogni consumer riceverà lo stesso numero di tuple.

Una volta creata la relazione tra bolt e spouts dobbiamo sottomettere la topologia al sistema locale, per debug:

conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("solr-topology", conf, builder.createTopology());

o per la modalità distribuita:

conf.put(Config.TOPOLOGY_DEBUG, false);
conf.setDebug(topologyConf.SetDebug);
conf.put(Config.TOPOLOGY_WORKERS, topologyConf.NumOfWorker);
conf.put(Config.NIMBUS_HOST, topologyConf.NimbusHost);
conf.put(Config.NIMBUS_THRIFT_PORT, topologyConf.NimbusPort);
StormSubmitter.submitTopology(topologyConf.TopologyId, conf, builder.createTopology());

Run e deploy Storm

L’applicazione Storm è stata realizzata con l’IDE Netbeans come progetto Maven.

Come prima operazione, editare il file src/main/resources/topology.properties e configurare i dati del proprio sistema e applicativi. Di seguito l’elenco dei parametri configurabili:

CLUSTER CONFIGURATION
nimbus.hostname Nimbus hostname
nimbus.port Nimbus port
STORM
storm.topologyId Id della topologia. Assegnare un diverso id per ogni topologia che viene distribuita.
conf.setDebug Se impostato a true, abilita la modalità debug nei log. Impostare a false in modalità distribuita.
storm.workers.num Numero di workers che devono eseguire la topologia
storm.isProduction Impostare a true in caso di build in modalità distribuita, a false se deve essere eseguita in local mode.
SOLR
solr.protocol Protocollo di comunicazione verso solr, http o https
solr.hostname Solr hostname
solr.port Solr port
solr.sleep.ms Timeout  in millisecondi. Indica ogni quanto tempo deve essere eseguita una nuova query su Solr
solr.*Limit Per ogni spout, indica il limite di dati da estrarre con la query Solr
SOCKET
socket.server Socket IP
socket.port Socket port

 

Oltre al parametro storm.isProduction, è necessario compilare due build distinte per eseguire l’applicazione in modalità locale o distribuita. In particolare, per la modalità distribuita, è necessario decommentare nel file pom.xml la riga <scope>provided</scope> per la dipendenza storm-core.

Di seguito i comandi di esecuzione delle build.

Local mode:

java -cp StormTopology-1.0.jar com.ecube.solrpoc.stormtopology.StormTopology topology.properties

Distribuited mode:

storm jar StormTopology-1.0.jar com.ecube.solrpoc.stormtopology.StormTopology topology.properties

Controllare su Storm UI se la topologia è stata sottomessa e se funziona correttamente monitorando i messaggi emessi, acked o failed.

Socket UDP

Il socket UDP è rappresentato da una semplice applicazione consolle .NET 4.0. Il suo ruolo è quello di restare in ascolto sulla porta configurata, leggere i dati che vengono inviati da Storm, eseguire un semplice parser e salvare i file json in una destinazione predefinita.

Configurazione

Tutti i parametri sono configurabili nel file app.config, in dettaglio:

UDP CONFIGURATION
IpPort Porta da abilitare sul firewall di rete per la ricezione UDP in ingresso, abilitando le macchine dove girerà Storm
FILE SETTINGS
JsonFilePath Path fisico dove saranno salvati i file json
ActiveUserFileName Nome del file json per streamActiveUsers
HashtagUsedFileName Nome del file json per streamHashtagUsed
PlatformUsedFileName Nome del file json per streamPlatformUsed
ClientUsedFileName Nome del file json per streamClientUsed
LatestTweetFileName Nome del file json per streamLatestTweet
LatestGeoTweetFileName Nome del file json per streamLatestGeoTweet
NumberOfTweetFileName Nome del file json per streamCountTweet
NumberOfRetweetFileName Nome del file json per streamCountRetweet
CountryCodeFileName Nome del file json per streamCountryCode
LanguageUsedFileName Nome del file json per streamLanguageUsed
LatestRetweetFileName Nome del file json per streamLatestReTweet

Udp Client

Il client è stato creato attraverso la classe .NET System.Net.Socket.UdpClient:

// Create UDP client
var receiverPort = Convert.ToInt32(Ecubecenter.Util.ConfigUtil.AppSettings["IpPort"]);
var receiver = new UdpClient(receiverPort);
// Start async receiving
receiver.BeginReceive(DataReceived, receiver);

Il metodo DataReceived() riceverà, in modalità asincrona tutti i dati:

var c = (UdpClient)ar.AsyncState;
var receivedIpEndPoint = new IPEndPoint(IPAddress.Any, 0);
var receivedBytes = c.EndReceive(ar, ref receivedIpEndPoint);

// Convert data to ASCII and print in console
var receivedText = Encoding.ASCII.GetString(receivedBytes);

Parser

Il risultato json viene letto e decodificato attraverso la classe Json.NET. Possiamo distinguere gli stream attraverso il parametro facet.field che abbiamo inizializzato nella query:

// Parse response in json
dynamic json = JObject.Parse(receivedText);
// Call parser according type of stream received
JToken typeStream = json["responseHeader"]["params"]["facet.field"];
switch (typeStream.Value&amp;lt;String&amp;gt;())
{
case "screenName_s":
ParseActiveUser.Parse(json);
break;
case "hashtag":
ParseHashtagUsed.Parse(json);
break;
case "piattaforma":
ParsePlatformUsed.Parse(json);
break;
case "client":
ParseClientUsed.Parse(json);
break;
case "latestTweet":
ParseLatestTweet.Parse(json);
break;
case "geoTweet":
ParseLatestGeoTweet.Parse(json);
break;
case "countTweet":
ParseCountTweet.Parse(json);
break;
case "countRetweet":
ParseCountRetweet.Parse(json);
break;
case "countryCode_s":
ParseCountryCode.Parse(json);
break;
case "language":
ParseLanguageUsed.Parse(json);
break;
case "retweet":
ParseLatestRetweet.Parse(json);
break;
default:
break;
}

Ogni classe statica Parse<xxx> leggerà i dati in input e creerà un file json di semplice utilizzo per la web dashboard, ossia con le sole informazioni da mostrare all’utente finale. Il file viene sovrascritto ad ogni nuovo aggiornamento.

Si potrebbe anche utilizzare direttamente il codice json risultante dalle query ma risulterebbe più complesso applicare un parser con javascript nella dashboard. Inoltre, la creazione di un file json ad hoc ci permette di integrare il dato con altre informazioni, come ad esempio la data di aggiornamento, e formattare il tutto già a monte in modo da rendere più veloce e performante la visualizzazione.

Run e deploy socket UDP

Il deploy del socket non richiede particolari configurazioni o settaggi, oltre a quelli già definiti nel paragrafo precedente.

Si procederà semplicemente con una build in modalità release attraverso gli strumenti di Visual Studio e si copierà tutto sul server dove verrà eseguita, lo stesso della web dashboard.

Come già scritto, si tratta semplicemente di un applicazione consolle che in esecuzione resterà attiva. Per una soluzione ideale bisognerebbe convertirla in servizio ed eseguirla in background.

Web dashboard

La web dashboard rappresenta la user interface dove visualizzare i dati raccolti ed elaborati da Solr e Storm.

I dati dei tweet geolocalizzati vengono visualizzati su una mappa che mostra ogni 10 secondi il tweet e la sua posizione; in alto a destra ci sono i contatori (conteggio totale dei tweet e retweet memorizzati su Solr).

Subito sotto, in specifici riquadri, vengono visualizzate tutte le altre informazioni aggregate in tabelle o grafici.

La dashboard è stata realizzata con tecnologie web HTML5, CSS3 e javascript. In particolare, è stato utilizzato il framework Jquery (1.10.1) e i plugin:

Per la mappa invece sono stati utilizzati le API di Google Map v3.

Al caricamento della pagina, sull’evento onload, viene inizializzata la mappa con la funzione initialize():

function initialize() {
var mapOptions = {
center: new google.maps.LatLng(25, 0),
zoom: 2,
mapTypeId: google.maps.MapTypeId.ROADMAP
};
map = new google.maps.Map(document.getElementById("map_canvas"),
mapOptions);
}

Subito dopo viene richiamata la funzione loadMap(map) che leggerà e mostrerà i tweet sulla mappa e nel box sottostante. Ogni tweet deve essere mostrato ogni 10 secondi e per far ciò andremo prima a leggere il json (creato dal web socket) creando un array di oggetti:

$.each(jsonData.Tweets, function(key, data) {

var date = data.Data;
var countryCode = data.Paese;
var tweet = data.Tweet;
var user = data.User;
var lang = data.Lingua;
var latLng = new google.maps.LatLng(data.Latitudine, data.Longitudine);

// Creating a marker and putting it on the map
var marker = new google.maps.Marker({
position: latLng,
title: data.User,
icon: 'img/map.png'
});
var infoText = "&amp;lt;h1 class='titleInfoWindow'&amp;gt;" + data.Data + " - " + data.User + "&amp;lt;/h1&amp;gt;&amp;lt;div class='contentInfoWindow'&amp;gt;" + cutString(data.Tweet,100) + "&amp;lt;/div&amp;gt;";
arrGeoTweet.push({
date: data.Data,
countryCode: data.Paese,
tweet: data.Tweet,
user: data.User,
lang: data.Lingua,
latLng: latLng,
marker: marker,
infoText: infoText
});
});

Poi mostriamo il primo tweet nel box e il relativo marker sulla mappa:

</pre>
var indexArr = 0; &amp;nbsp; // Show first marker var marker = arrGeoTweet[indexArr].marker; marker.setMap(map); infowindow = showInfoMarker(infowindow, map, marker, arrGeoTweet[indexArr].infoText); &amp;nbsp; // Show first tweet showTweet(arrGeoTweet[indexArr]); &amp;nbsp; indexArr++; &amp;nbsp; Ed infine leggiamo l’array di oggetti ogni 10 secondi per mostrare i successivi dati; dopo l’ultimo dato disponibile, rileggiamo il file json che nel frattempo sarà stato aggiornato dal socket: &amp;nbsp; // Add other market every 10000 seconds var refreshMarker = setInterval(function(data) { &amp;nbsp; if (indexArr&gt;=arrGeoTweet.length) { clearInterval(refreshMarker); // Reset index indexArr = 0; &amp;nbsp; // Clear markers for(var i=0; i &lt; arrGeoTweet.length; i++){ arrGeoTweet[i].marker.setMap(null); } &amp;nbsp; // Read again json file loadMap(map); } else { var marker = arrGeoTweet[indexArr].marker; &amp;nbsp; marker.setMap(map); infowindow = showInfoMarker(infowindow, map, marker, arrGeoTweet[indexArr].infoText); &amp;nbsp; showTweet(arrGeoTweet[indexArr]); indexArr++; } &amp;nbsp; }, 10000); &lt;pre&gt;

Per quanto riguarda i box testuali, l’approccio è abbastanza semplice e simile per tutti: viene richiamata una funzione che legge il file json – attraverso il metodo $.getJSON – , passa i dati al template creato con JsRender e aggiorna la data di aggiornamento; questa operazione viene eseguita ad intervalli di 30 secondi:

 

HTML:

</pre>
<div class="box list activeUsers">

<div class="title">

Users

</div>

<div class="content">

<ul id="activeUsersBlock" class="newsticker"></ul>

<script id="activeUsersTmpl" type="text/x-jsrender">

<li><span>{{:User}}</span> {{:Count}}</li>

</script>

<div id="luActiveUsersBlock" class="lastUpdate"></div>

</div>

</div>
<pre>

Javascript:

function getActiveUsers() {
clearInterval(timerActiveUsers);
// Utenti attivi
loadActiveUsers();
// Set up an interval on which the data is to be updated
timerActiveUsers = setInterval(function () { getActiveUsers(); }, 30000);
}
function loadActiveUsers() {
$.getJSON( activeUsersJson, function( data ) {
var template = $("#activeUsersTmpl");
var html = template.render(data.Users);
var ulDiv = $("#activeUsersBlock");
ulDiv.empty();
ulDiv.html(html);
var luDiv = $("#luActiveUsersBlock");
luDiv.empty();
luDiv.html("Updated to: " + data.DateUpdate);
});
}

Web Dashboard

 

Al box Latest retweet è stato invece applicato il plugin NewsTicker che crea un effetto scrolling verso l’alto, in modo descrescente in ordine di data di invio del tweet.

Abbiamo voluto infine rappresentare le informazioni più omogenee come clients, piattaforme, paesi e lingue attraverso dei grafici intuitivi, a barre e a torte, che forniscono all’utente una visione immediata dei dati. Il plugin JqPlot è di semplice configurazione ed utilizzo; ad esempio per realizzare il grafico a barre per i clients:

function loadClientUsed() {
$.getJSON( clientUsedJson, function( data ) {
var arr = [];
$.each(data.Clients, function(key, data) {
arr.push([data.Client,data.Count]);
});
$("#barClientUsed").empty();
$('#barClientUsed').jqplot([arr], {
animate: !$.jqplot.use_excanvas,
seriesDefaults:{
renderer:$.jqplot.BarRenderer
},
axes:{
xaxis:{
renderer: $.jqplot.CategoryAxisRenderer
},
yaxis : {
tickOptions: {formatString: "%'i" },
min : 0
}
}
});
var luDiv = $("#luClientUsedBlock");
luDiv.empty();
luDiv.html("Updated to: " + data.DateUpdate);
});
}

Deploy web dashboard

Trattandosi di una semplice pagina web, la dashboard non richiede particolari configurazioni per il deploy, è necessario solo copiare i file sul server web configurato opportunamente.

Bisogna solo accertarsi che i file json generati dal socket siano salvati nella cartella ”json” dell’applicazione web. Nel caso in cui si volessimo cambiare i nomi dei files o il path, bisogna ricordarsi di modificare i riferimenti sia nel file action.js che nel file di configurazione del socket.

Conclusioni

Il presente PoC ha dimostrato come integrare un sistema big data basato su piattaforma Hadoop ad una web dashboard semplice e sicuramente intuitiva per l’utente finale. Il nostro esempio utilizza Twitter come sorgenti dati ma nulla toglie di utilizzare altre tipologie di dati eterogenei da aggregare secondo le proprie esigenze.

Essendo appunto un proof of concept,  il progetto può peccare in tanti punti che potevano essere realizzati meglio o diversamente ma l’obiettivo era sicuramente un altro. Volendo ampliare questo progetto, potremmo salvare i dati, direttamente da Storm, su database sql-like hive oppure nosql hbase per analizzarli con altri strumenti di business intelligence oppure integrare sistemi di alert con le windowing per individuare il topic che sta diventando “di tendenza”. Insomma… spazio alla fantasia!

Add a Comment

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *