Breaking

Thursday, December 18, 2014

Hands on: Build a Storm analytics solution

Storm lets you create real-time analytics for every conceivable need.

 thunderstorm

Two weeks ago, we examined the two most popular real-time processing frameworks, Apache Storm and Apache Spark. Now we're going to take a much deeper look at Storm and walk through a basic Storm deployment for consuming Twitter messages and performing analytics on the Twitter stream.

To this end, we'll extract important keywords from individual tweets and calculate rolling metrics related to how actively a given keyword is being discussed. Plus, we'll do some lightweight sentiment analysis to determine the tenor of the discussion on a given topic. We'll also look at how Storm and XMPP combine nicely for extracting important "moment in time" events from a stream and for sending those events out as alerts.

All about Storm

Storm is an open source, distributed, stream-processing platform, designed to make it easy to build massively scalable systems for performing real-time computations on continuous streams of data.

People sometimes refer to Storm as the Hadoop of real-time processing, but it's important to note that Storm has no particular dependency on the MapReduce programming model. You may, if your needs so dictate, code a Storm solution to use a MapReduce model, but nothing about Storm requires it. In fact, Storm bears a slight resemblance to pre-Hadoop distributed computing systems like MPI in terms of the flexibility you have in designing your application.

To accomplish this, Storm depends on a small number of key concepts, including topologies, spouts, bolts, and tuples. A spout is an interface into a stream of data; it can process that stream, possibly do some initial filtering or processing, and transform it into a stream of tuples, which are then sent to bolts for processing.

Tuples are labeled collections of fields that are passed among Storm components (spouts and bolts). Bolts are the Storm components that do the heavy lifting of handling complex calculations. A Storm topology is a collection of spouts and bolts that have been wired together to create a specific solution.

A significant distinction between Storm and Hadoop is that Hadoop MapReduce jobs run in batch mode where they are started, run until a given set of data has been processed, and terminate. By contrast, Storm topologies default to running forever until they are explicitly killed. As long as the topology is running, the spouts will collect data and send tuples to the bolts for processing.

Storm provides considerable flexibility for designing topologies to match the problem at hand. A topology may include multiple spouts, and bolts may consume tuples from one more more spouts, then in turn emit tuples that are sent to one or more subsequent bolts for additional processing. Generally speaking, the chain of processing steps may be as long as needed, with components represented as a directed acyclic graph of operations. Within this framework you can implement MapReduce algorithms, ScatterGather operations, pipeline-oriented algorithms, or whatever is required to perform the desired calculation.

Storm addresses parallelism by running instances of the various components (spouts and bolts) on multiple nodes in the cluster. Your topology configuration specifies the desired level of parallelism for each component. This allows you to use a large number of nodes for a computationally expensive step in the overall workflow, for example, while other steps may require only one node.

Field groupings allow the programmer to control the distribution of tuples between multiple instances of a given component. Default options include the "shuffle" grouping, where tuples are distributed randomly among instances, "field" grouping, where all tuples with the same value for a specified field go to the same instance (similar to how MapReduce works), and "all" grouping where each instance receives all tuples.

Storm allows the use of any arbitrary library and essentially any valid Java code within components. Spouts and bolts can read from, or write to, any database, file system, queue, cache, socket, or endpoint that is required. In the following examples, we will examine a Storm topology that consumes Twitter data, reads files from a file system, and writes to an XMPP server.

Before we move on to the demo code, there is one important point to understand about statefulness in Storm components: Bolts may be stateful or stateless depending on their function; a bolt that is calculating a "rolling average" of some metric over a period of time must maintain data across invocations. Other bolts may process a given tuple, perform the calculation at hand, and discard the data. In the case of a stateful component, it is up to the programmer to ensure that intermediate state is saved to an external location, and reloaded as necessary, to allow for the failure of a node.

Getting started with Storm

To run the code samples, you need a local installation of Storm. Storm includes a "local" mode for development purposes that simulates parallelism using threads, so you won't need to build a full-fledged Storm cluster to follow along. To install Storm, download the Storm 0.9.3 bundle and extract it somewhere on your file system. Once extracted, add the apache-storm-0.9.2/bin directory to your path, so you can run the storm command on the command line.

All source code for this article is available on GitHub:
Clone this project, and build it using the command:
$ mvn clean compile package
After the build completes, cd into the target/ directory, and run the topology using the command:

$ storm jar storm-samples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ com.osintegrators.example.storm.main.TwitterAnalyticsTopology

Exploring Storm code

The basic framework for running Storm topologies here is borrowed from the tutorial in the official Storm documentation. To see how a topology is wired up and run, take a look in TwitterAnalyticsTopology.java.

Here our main() method instantiates an instance of our topology and submits it to run locally or on a cluster, depending on the provided command-line arguments. Since we did not pass the keyword "remote" as an argument in our launch command, we will run in local mode.
boolean runLocally = true;
if (args.length >= 2 && args[1].equalsIgnoreCase("remote"))
{
runLocally = false;
}

TwitterAnalyticsTopology ttt = null;
if( !topologyName.isEmpty() )
{
ttt = new TwitterAnalyticsTopology(topologyName);
}
else
{
ttt = new TwitterAnalyticsTopology();
}

logger.info( "Topology Name: " + ttt.topologyName );

if (runLocally)
{
logger.info("Running in local mode");
ttt.runLocally();
}
else
{
logger.info("Running in remote (cluster) mode");
if (runLocally)
ttt.runRemotely();
}
The actual wiring of the topology happens in the wireTopology() method, which is called from the constructor for TwitterAnalyticsTopology. As you can see, wiring a topology consists of creating instances of our components, passing and required setup/configuration data to those instances, assigning the spouts and bolts to the topology, and defining the connections between the components. The calls to the shuffleGrouping() method are how we declare the flow of tuples between components.

In this example, we pass our Twitter credentials into the TwitterSpout, so it can open a connection to Twitter and receive tweets. Thus, mentionTargets is a list of keywords that we specifically want to look for in our Twitter stream. We pass this list into the TweetToMentionBolt, which we will examine momentarily.
private void wireTopology()
{
logger.info( "Wiring topology...");

String consumerKey = "your_consumer_key";
String consumerSecret = "your_consumer_secret";
String accessToken = "your_access_token";
String accessTokenSecret = "your_token_secret";
String[] keywords = {};

List<String> mentionTargets = new ArrayList<String>();

mentionTargets.add( "facebook" );
mentionTargets.add( "tor" );
mentionTargets.add( "oracle" );
mentionTargets.add( "jive" );
mentionTargets.add( "manufacturing" );
mentionTargets.add( "openstack" );
mentionTargets.add( "barrett" );
mentionTargets.add( "hadoop" );
mentionTargets.add( "arduino" );
mentionTargets.add( "memristor" );
mentionTargets.add( "sony" );
mentionTargets.add( "scala" );

this.builder.setSpout( "twitterSpout", new TwitterSpout(consumerKey, consumerSecret, accessToken, accessTokenSecret, keywords ), 1);
this.builder.setBolt( "tweetToMentionBolt", new TweetToMentionBolt(mentionTargets), 3 ).shuffleGrouping("twitterSpout");
this.builder.setBolt( "singleMentionAlerterBolt", new SingleMentionAlerterBolt( "talk.google.com", "5222", "gmail.com" ), 1).shuffleGrouping("tweetToMentionBolt");
this.builder.setBolt( "mentionsThresholdBolt", new MentionsThresholdBolt("talk.google.com", "5222", "gmail.com"), 1).shuffleGrouping("tweetToMentionBolt");
this.builder.setBolt( "sentimentAnalysisBolt", new SentimentAnalysisBolt( "talk.google.com", "5222", "gmail.com", "/home/prhodes/workspaces/storm/storm-samples/AFINN/AFINN-111.txt" ), 3).shuffleGrouping( "tweetToMentionBolt" );
}
We set the parallelism for our TwitterSpout to 1 since we need only one instance of this class running. If we ran multiple instances of this, we would receive multiple copies of the same message, and we'd risk running afoul of Twitter's restrictions on multiple simultaneous connections from the same account/Internet protocol.

In the TwitterSpout class we take advantage of the open() method, which is called by Storm as part of initializing the topology, to open our connection to Twitter and set up our UserStreamListener instance. Any tweets that are received are stored in the LinkedBlockingQueue instance, where they are available when Storm initiates a call to the nextTuple() method.
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
logger.info( "TwitterSpout opening...");
this.collector = collector;
this.queue = new LinkedBlockingQueue<Status>(1000);

UserStreamListener listener = new UserStreamListener()
{

@Override
public void onStatus(Status status)
{
logger.info( "onStatus() called");
logger.info( "Status: " + status.getUser().getName() + " : " + status.getText() );
queue.offer(status);
}

@Override
public void onException(Exception ex)
{
logger.error( "Exception: \n", ex);
}

};

this.twitterStream = new TwitterStreamFactory(
new ConfigurationBuilder().setJSONStoreEnabled(true).build()).getInstance();

this.twitterStream = new TwitterStreamFactory(
this.twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
AccessToken token = new AccessToken(accessToken, accessTokenSecret);
this.twitterStream.setOAuthAccessToken(token);

if (keyWords.length == 0)
{
logger.info( "Sample Twitter Stream");
this.twitterStream.user();
}

else {

FilterQuery query = new FilterQuery().track(keyWords);
this.twitterStream.filter(query);
}
}
When nextTuple() is called, we extract any available tweets from the queue and emit them using the SpoutOutputCollector instance. The order in which arguments are passed to the Values() constructor must match the order in which the output field names are declared in the declareOutputFields() method to allow downstream components to extract fields from the supplied tuple.
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("tweet"));

}

public void nextTuple()
{
// logger.info( "Called nextTuple()");
Status ret = queue.poll();
if (ret == null)
{
// logger.info( "No value to emit...");
Utils.sleep(50);
}
else
{
// logger.info( "Emitting value...");
collector.emit(new Values(ret));
}
}
Once the TwitterSpout has emitted a tuple, it will be processed by the execute() method in the next component in the chain -- in our case, TweetToMentionBolt. This bolt is responsible for examining tweets to see if they contain one of the keywords we are monitoring for. If one of our keywords is found, this bolt emits a new tuple with the specified keyword, the tweet text, and a Date object representing when the mention occurred. Again, the ordering of fields in the emit() method must match the declaration in the declareOutputFields() method.
@Override
public void execute(Tuple input, BasicOutputCollector collector)
{
Status status = (Status)input.getValueByField("tweet");

// parse the status and look for mentions of the entity we're interested in...
String statusText = status.getText();

logger.info( "status: " + statusText );

for( String mentionTarget : mentionTargets )
{
if( statusText.toLowerCase().matches( ".*\\s*" + mentionTarget + "\\s+.*" ))
{
logger.info( "emitting metion: " + mentionTarget );
collector.emit( new Values( mentionTarget, new Date(), status ) );
}
else
{
// NOP
}
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("mentionTarget", "date", "status" ));
}
TweetToMentionBolt is a stateless bolt, and multiple instances of this class could freely operate on any random tuple from upstream. Since our downstream bolts don't care where their tuples come from, this is a perfect example of a bolt where we might increase the parallelism if this were a computationally expensive operation. We can use as many instances of this bolt as we need in order to handle the load in a timely fashion.

After TweetToMentionBolt emits a tuple, that tuple will be delivered to three separate downstream bolts. SingleMentionAlerterBolt does nothing but send an XMPP message indicating that the specified keyword has been mentioned in a tweet. This could be useful if you were scanning for mentions of a comparatively rare term, but is otherwise here mainly to demonstrate setting up the XMPP integration in a bolt.

MentionsThresholdBolt is a stateful bolt that maintains a list of mentions along with the time the mention occurred, and sends an alert when a keyword is mentioned more than a certain number of times per hour. SentimentAnalysisBolt performs sentiment analysis on the tweets that mention our specified keywords and sends an alert only if the sentiment of the tweet is above or below a specific value.

As you can see below, we use the prepare() method, which is called by Storm as part of initializing a component, to set up our XMPP connection. Once established by prepare(), we use this object in the execute method to alert on events that match our criteria (depending on which bolt we are in).
public class SingleMentionAlerterBolt extends BaseBasicBolt
{
private static final Logger logger = Logger.getLogger( SingleMentionAlerterBolt.class );
private static final long serialVersionUID = 1L;

private XMPPConnection xmppConnection;
private String chatUsername = "your_gtalk_username";
private String chatPassword = "your_gtalk_password";
private String xmppHost;
private String xmppPort;
private String serviceName;

Map<String, List<Date>> mentions = new HashMap<String, List<Date>();

public SingleMentionAlerterBolt( String xmppHost, String xmppPort, String serviceName )
{
this.xmppHost = xmppHost;
this.xmppPort = xmppPort;
this.serviceName = serviceName;
}

@Override
public void prepare(Map stormConf, TopologyContext context)
{
super.prepare(stormConf, context);

ConnectionConfiguration cc = new ConnectionConfiguration( xmppHost, Integer.parseInt( xmppPort ), serviceName );
this.xmppConnection = new XMPPTCPConnection(cc);

try
{
xmppConnection.connect();

xmppConnection.login( chatUsername, chatPassword);

}
catch (SmackException e)
{
logger.error( "", e );
}
catch (IOException e)
{
logger.error( "", e );
}
catch (XMPPException e)
{
logger.error( "", e );
}
}

@Override
public void execute(Tuple input, BasicOutputCollector collector)
{
String mentionTarget = input.getStringByField( "mentionTarget" );
Date mentionDate = (Date)input.getValueByField("date");

logger.info( mentionTarget + " mentioned at: " + mentionDate );

// send XMPP alert to user(s)
try
{

Chat chat = ChatManager.getInstanceFor(xmppConnection).createChat( "some_user_of_interest@gmail.com", new MessageListener() {

@Override
public void processMessage(Chat chat, Message message)
{
// TODO Auto-generated method stub

}} );

// google bounces back the default message types, you must use chat
Message msgObj = new Message("some_user_of_interest@gmail.com", Message.Type.chat);
msgObj.setBody(mentionTarget + " mentioned at: " + mentionDate );
chat.sendMessage( msgObj );
}
catch (SmackException e)
{
logger.error( "", e );
}

}
}
MentionsThresholdBolt is a stateful bolt that maintains a rolling "mentions per hour" metric for each keyword in our list. The intermediate state is maintained in a Map instance called mentions. For the purpose of this demo, we did not implement a cache/restore operation to allow for node failure, but the basic algorithm would be to write out intermediate state (to disk, to an in-memory cache on another machine, and so on) in the execute() method, and look for/load any cached state in the prepare() method.

In this example, we drop any mention more than an hour old and recalculate our "mentions per hour" metric each time a new mention occurs. If the new value is above our threshold, we send an XMPP message.
public void execute(Tuple input, BasicOutputCollector collector)
{
// store mentions and calculate a running metric of "mentions / hour"
// if the mph exceeds our threshold, send an alert
// we could also log it to a database as a point in time snapshot for historical reporting

String mentionTarget = input.getStringByField( "mentionTarget" );
Date mentionDate = (Date)input.getValueByField("date");
logger.info( "MENTIONSTHRESHOLDBOLT: " + mentionTarget + " mentioned at: " + mentionDate );

// get the current running mention list for this mentionTarget,
// prune anything older than our threshold (one hour for this example)
// add the current mention, and sum the mentions in the past hour
// if we're over our threshold, send an alert via XMPP
SortedSet<Date> mentionsForTarget = mentions.get( mentionTarget );
if( mentionsForTarget == null )
{
mentionsForTarget = new TreeSet<Date>();
mentionsForTarget.add(mentionDate);
mentions.put(mentionTarget, mentionsForTarget);
}
else
{
// we found an existing list, add our new entry and remove anything older than an hour
mentionsForTarget.add(mentionDate);

Iterator<Date> iter = mentionsForTarget.iterator();

// setup a Date instance for one hour ago
Calendar now = GregorianCalendar.getInstance();
now.add(Calendar.HOUR, -1 );

while( iter.hasNext() )
{
Date d = iter.next();
if( d.before(now.getTime()))
{
iter.remove();
}
else
{
// we've found a record that is within the hour window, we can stop looking at the others, since this set is sorted
break;
}
}

int mentionsInPastHour = mentionsForTarget.size();
if( mentionsInPastHour > 2 )
{
// send XMPP alert to user(s)
try
{

Chat chat = ChatManager.getInstanceFor(xmppConnection).createChat( "some_user_of_interest@gmail.com", new MessageListener() {

@Override
public void processMessage(Chat chat, Message message)
{
// TODO Auto-generated method stub

}} );

// google bounces back the default message types, you must use chat
Message msgObj = new Message("some_user_of_interest@gmail.com", Message.Type.chat);
msgObj.setBody(mentionTarget + " mentioned " + mentionsInPastHour + " times in past hour. " + new Date().toString() );
chat.sendMessage( msgObj );
}
catch (SmackException e)
{
logger.error( "", e );
}
}

}
}
SentimentAnalysisBolt illustrates how we can incorporate the use of static, offline data into our stream processing system. To perform a relatively naive sentiment analysis operation, we need to load a list of words for which the valence has been precalculated. For this demo, we use the AFINN data set. To make this data available in our bolt, we use the prepare() method to simply read it from a tab delimited file on the file-system and cache it in a Map instance.
public void prepare(Map stormConf, TopologyContext context)
{
super.prepare(stormConf, context);

// load the AFINN data
BufferedReader reader = null;
try
{
FileReader fReader = new FileReader( this.afinnPath );
reader = new BufferedReader( fReader );

String line = null;
while( (line = reader.readLine()) != null )
{
logger.warn( "LINE: " + line );

String[] parts = line.split( "\t" );
String word = parts[0];
System.out.println( "parts[0] = " + parts[0]);
Integer score = Integer.parseInt( parts[1] );

wordScores.put( word.trim(), score );

}

}
catch( Exception e )
{
throw new RuntimeException( e );
}
finally
{
if( reader != null )
{
try
{
reader.close();
}
catch (IOException e)
{}
}
}
}
With the AFINN data available, our execute() method can iterate over the words in the tweet, accumulating the sentiment for any words that are in the AFINN list. The final score is calculated by dividing the "raw" sentiment score by the total number of words. There are, of course, more elaborate sentiment analysis algorithms available, but we're trying to keep this demo focused on the core elements of building a Storm application.
public void execute(Tuple input, BasicOutputCollector collector)
{
String mentionTarget = input.getStringByField( "mentionTarget" );
Date mentionDate = (Date)input.getValueByField("date");
Status status = (Status)input.getValueByField( "status" );

logger.warn( "SENTIMENTANALYSIS: " + mentionTarget + " mentioned at: " + mentionDate );

String text = status.getText();
String[] words = text.split( " " );

double sentimentScore = 0.0;
for( String word : words )
{
System.out.println( "evaluating word: " + word );
Integer tempScore = wordScores.get( word.trim());
if( tempScore != null )
{
System.out.println( "sentiment value for word (" + word + ") is " + tempScore );
sentimentScore += tempScore.intValue();
}

}

sentimentScore = ( sentimentScore / words.length);
System.out.println( "final sentiment score: " + sentimentScore );

if( sentimentScore >= 0.5 || sentimentScore <= -0.5 )
{

try
{

Chat chat = ChatManager.getInstanceFor(xmppConnection).createChat( "some_user_of_interest@gmail.com", new MessageListener() {

@Override
public void processMessage(Chat chat, Message message)
{

}} );

// google bounces back the default message types, you must use chat
Message msgObj = new Message("some_user_of_interest@gmail.com", Message.Type.chat);
msgObj.setBody(mentionTarget + " mentioned at: " + mentionDate + ", with sentimentScore : " + sentimentScore + "( tweet id: " + status.getId() + ")" );
chat.sendMessage( msgObj );
}
catch (SmackException e)
{
logger.error( "", e );
}
}
}

After the deluge

This application, while admittedly simple, uses all of the primary concepts of Storm. If you've followed along to this point, you'll have all of the basic tools needed to build an interactive Storm analytics solution. You can, of course, go much further than we had space for here. Other items you might want to include in a more elaborate production system:
  • Run on a real cluster. Codewise, the demo code above is ready to run on an actual Storm cluster. To run on a real cluster, build a cluster following the official Storm documentation, configure your local Storm client to point to the Nimbus server of the cluster, and add the "remote" keyword to the end of the launcher command.
  • Use pre-existing off-the-shelf libraries for common numerical/statistical calculations. We used the most basic metric calculations here, but you are limited only by your imagination (and your requirements) when deciding what metrics you want to calculate. Also, the plethora of existing libraries for statistical/numerical calculations opens many vistas for your exploration.
  • Use standard machine learning packages for clustering, classification, and other operations. As with statistics and numerical calculations, machine learning libraries are legion, and many high-quality libraries are open source and freely available. The repository at mloss.org maintains an amazing list of the latest in open source machine learning software.
  • Front-end Storm with Apache Kafka. We didn't need Kafka for this small-scale demo, but this is a very powerful combination. If you have a very high volume of messages that need to be processed, using Kafka for pre-processing and partitioning in front of Storm works very well.
  • Use R with Storm. Spouts and bolts can be written in almost any language, but for complex statistical analytics R is a purpose-built language with an incredible array of available libraries. R is the de facto software tool used by statisticians and can be an important arrow to have in your own quiver.
  • Integrate with a database or with Hadoop. While a Storm topology is running, it can easily persist intermediate values to an RDBMS, Hadoop, or some other data store to support longer-term historical analysis and reporting. Storm can also easily consume data from Hadoop or from an RDBMS.
Using Storm and these other tools, you will be able to construct a real-time analytics system that can handle any conceivable need. Whether you're doing real-time marketing, sensor systems on the Internet of things, predictive analytics, or more, Storm provides a rock-solid foundation to build on.

1 comment:

  1. Thanks for sharing this informative information..
    For Storm components all over information you may also refer.... http://www.s4techno.com/blog/2016/08/13/storm-components/

    ReplyDelete