Storm lets you create real-time analytics for every conceivable need.
Two weeks ago, we examined the two most popular real-time processing frameworks,
Apache Storm and Apache Spark. Now we're going to take a much deeper
look at Storm and walk through a basic Storm deployment for consuming
Twitter messages and performing analytics on the Twitter stream.
To this end, we'll extract important keywords from individual tweets and calculate rolling metrics related to how actively a given keyword is being discussed. Plus, we'll do some lightweight sentiment analysis to determine the tenor of the discussion on a given topic. We'll also look at how Storm and XMPP combine nicely for extracting important "moment in time" events from a stream and for sending those events out as alerts.
People sometimes refer to Storm as the Hadoop of real-time processing, but it's important to note that Storm has no particular dependency on the MapReduce programming model. You may, if your needs so dictate, code a Storm solution to use a MapReduce model, but nothing about Storm requires it. In fact, Storm bears a slight resemblance to pre-Hadoop distributed computing systems like MPI in terms of the flexibility you have in designing your application.
To accomplish this, Storm depends on a small number of key concepts, including topologies, spouts, bolts, and tuples. A spout is an interface into a stream of data; it can process that stream, possibly do some initial filtering or processing, and transform it into a stream of tuples, which are then sent to bolts for processing.
Tuples are labeled collections of fields that are passed among Storm components (spouts and bolts). Bolts are the Storm components that do the heavy lifting of handling complex calculations. A Storm topology is a collection of spouts and bolts that have been wired together to create a specific solution.
A significant distinction between Storm and Hadoop is that Hadoop MapReduce jobs run in batch mode where they are started, run until a given set of data has been processed, and terminate. By contrast, Storm topologies default to running forever until they are explicitly killed. As long as the topology is running, the spouts will collect data and send tuples to the bolts for processing.
Storm provides considerable flexibility for designing topologies to match the problem at hand. A topology may include multiple spouts, and bolts may consume tuples from one more more spouts, then in turn emit tuples that are sent to one or more subsequent bolts for additional processing. Generally speaking, the chain of processing steps may be as long as needed, with components represented as a directed acyclic graph of operations. Within this framework you can implement MapReduce algorithms, ScatterGather operations, pipeline-oriented algorithms, or whatever is required to perform the desired calculation.
Storm addresses parallelism by running instances of the various components (spouts and bolts) on multiple nodes in the cluster. Your topology configuration specifies the desired level of parallelism for each component. This allows you to use a large number of nodes for a computationally expensive step in the overall workflow, for example, while other steps may require only one node.
Field groupings allow the programmer to control the distribution of tuples between multiple instances of a given component. Default options include the "shuffle" grouping, where tuples are distributed randomly among instances, "field" grouping, where all tuples with the same value for a specified field go to the same instance (similar to how MapReduce works), and "all" grouping where each instance receives all tuples.
Storm allows the use of any arbitrary library and essentially any valid Java code within components. Spouts and bolts can read from, or write to, any database, file system, queue, cache, socket, or endpoint that is required. In the following examples, we will examine a Storm topology that consumes Twitter data, reads files from a file system, and writes to an XMPP server.
Before we move on to the demo code, there is one important point to understand about statefulness in Storm components: Bolts may be stateful or stateless depending on their function; a bolt that is calculating a "rolling average" of some metric over a period of time must maintain data across invocations. Other bolts may process a given tuple, perform the calculation at hand, and discard the data. In the case of a stateful component, it is up to the programmer to ensure that intermediate state is saved to an external location, and reloaded as necessary, to allow for the failure of a node.
All source code for this article is available on GitHub:
Clone this project, and build it using the command:
After the build completes, cd into the target/ directory, and run the topology using the command:
Here our
The actual wiring of the topology happens in the
In this example, we pass our Twitter credentials into the TwitterSpout, so it can open a connection to Twitter and receive tweets. Thus,
We
set the parallelism for our TwitterSpout to 1 since we need only one
instance of this class running. If we ran multiple instances of this, we
would receive multiple copies of the same message, and we'd risk
running afoul of Twitter's restrictions on multiple simultaneous
connections from the same account/Internet protocol.
In the TwitterSpout class we take advantage of the
When
Once the TwitterSpout has emitted a tuple, it will be processed by the
TweetToMentionBolt
is a stateless bolt, and multiple instances of this class could freely
operate on any random tuple from upstream. Since our downstream bolts
don't care where their tuples come from, this is a perfect example of a
bolt where we might increase the parallelism if this were a
computationally expensive operation. We can use as many instances of
this bolt as we need in order to handle the load in a timely fashion.
After TweetToMentionBolt emits a tuple, that tuple will be delivered to three separate downstream bolts. SingleMentionAlerterBolt does nothing but send an XMPP message indicating that the specified keyword has been mentioned in a tweet. This could be useful if you were scanning for mentions of a comparatively rare term, but is otherwise here mainly to demonstrate setting up the XMPP integration in a bolt.
MentionsThresholdBolt is a stateful bolt that maintains a list of mentions along with the time the mention occurred, and sends an alert when a keyword is mentioned more than a certain number of times per hour. SentimentAnalysisBolt performs sentiment analysis on the tweets that mention our specified keywords and sends an alert only if the sentiment of the tweet is above or below a specific value.
As you can see below, we use the
MentionsThresholdBolt
is a stateful bolt that maintains a rolling "mentions per hour" metric
for each keyword in our list. The intermediate state is maintained in a
Map instance called
In this example, we drop any mention more than an hour old and recalculate our "mentions per hour" metric each time a new mention occurs. If the new value is above our threshold, we send an XMPP message.
SentimentAnalysisBolt
illustrates how we can incorporate the use of static, offline data into
our stream processing system. To perform a relatively naive sentiment
analysis operation, we need to load a list of words for which the
valence has been precalculated. For this demo, we use the AFINN data set. To make this data available in our bolt, we use the
With the AFINN data available, our execute() method
can iterate over the words in the tweet, accumulating the sentiment for
any words that are in the AFINN list. The final score is calculated by
dividing the "raw" sentiment score by the total number of words. There
are, of course, more elaborate sentiment analysis algorithms available,
but we're trying to keep this demo focused on the core elements of
building a Storm application.
To this end, we'll extract important keywords from individual tweets and calculate rolling metrics related to how actively a given keyword is being discussed. Plus, we'll do some lightweight sentiment analysis to determine the tenor of the discussion on a given topic. We'll also look at how Storm and XMPP combine nicely for extracting important "moment in time" events from a stream and for sending those events out as alerts.
All about Storm
Storm is an open source, distributed, stream-processing platform, designed to make it easy to build massively scalable systems for performing real-time computations on continuous streams of data.People sometimes refer to Storm as the Hadoop of real-time processing, but it's important to note that Storm has no particular dependency on the MapReduce programming model. You may, if your needs so dictate, code a Storm solution to use a MapReduce model, but nothing about Storm requires it. In fact, Storm bears a slight resemblance to pre-Hadoop distributed computing systems like MPI in terms of the flexibility you have in designing your application.
To accomplish this, Storm depends on a small number of key concepts, including topologies, spouts, bolts, and tuples. A spout is an interface into a stream of data; it can process that stream, possibly do some initial filtering or processing, and transform it into a stream of tuples, which are then sent to bolts for processing.
Tuples are labeled collections of fields that are passed among Storm components (spouts and bolts). Bolts are the Storm components that do the heavy lifting of handling complex calculations. A Storm topology is a collection of spouts and bolts that have been wired together to create a specific solution.
A significant distinction between Storm and Hadoop is that Hadoop MapReduce jobs run in batch mode where they are started, run until a given set of data has been processed, and terminate. By contrast, Storm topologies default to running forever until they are explicitly killed. As long as the topology is running, the spouts will collect data and send tuples to the bolts for processing.
Storm provides considerable flexibility for designing topologies to match the problem at hand. A topology may include multiple spouts, and bolts may consume tuples from one more more spouts, then in turn emit tuples that are sent to one or more subsequent bolts for additional processing. Generally speaking, the chain of processing steps may be as long as needed, with components represented as a directed acyclic graph of operations. Within this framework you can implement MapReduce algorithms, ScatterGather operations, pipeline-oriented algorithms, or whatever is required to perform the desired calculation.
Storm addresses parallelism by running instances of the various components (spouts and bolts) on multiple nodes in the cluster. Your topology configuration specifies the desired level of parallelism for each component. This allows you to use a large number of nodes for a computationally expensive step in the overall workflow, for example, while other steps may require only one node.
Field groupings allow the programmer to control the distribution of tuples between multiple instances of a given component. Default options include the "shuffle" grouping, where tuples are distributed randomly among instances, "field" grouping, where all tuples with the same value for a specified field go to the same instance (similar to how MapReduce works), and "all" grouping where each instance receives all tuples.
Storm allows the use of any arbitrary library and essentially any valid Java code within components. Spouts and bolts can read from, or write to, any database, file system, queue, cache, socket, or endpoint that is required. In the following examples, we will examine a Storm topology that consumes Twitter data, reads files from a file system, and writes to an XMPP server.
Before we move on to the demo code, there is one important point to understand about statefulness in Storm components: Bolts may be stateful or stateless depending on their function; a bolt that is calculating a "rolling average" of some metric over a period of time must maintain data across invocations. Other bolts may process a given tuple, perform the calculation at hand, and discard the data. In the case of a stateful component, it is up to the programmer to ensure that intermediate state is saved to an external location, and reloaded as necessary, to allow for the failure of a node.
Getting started with Storm
To run the code samples, you need a local installation of Storm. Storm includes a "local" mode for development purposes that simulates parallelism using threads, so you won't need to build a full-fledged Storm cluster to follow along. To install Storm, download the Storm 0.9.3 bundle and extract it somewhere on your file system. Once extracted, add the apache-storm-0.9.2/bin directory to your path, so you can run thestorm command on the command line.All source code for this article is available on GitHub:
Clone this project, and build it using the command:
$ mvn clean compile package$ storm jar
storm-samples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
com.osintegrators.example.storm.main.TwitterAnalyticsTopologyExploring Storm code
The basic framework for running Storm topologies here is borrowed from the tutorial in the official Storm documentation. To see how a topology is wired up and run, take a look in TwitterAnalyticsTopology.java.Here our
main() method instantiates an instance of our topology and
submits it to run locally or on a cluster, depending on the provided
command-line arguments. Since we did not pass the keyword "remote" as an
argument in our launch command, we will run in local mode.boolean runLocally = true;if (args.length >= 2 && args[1].equalsIgnoreCase("remote")){runLocally = false;}TwitterAnalyticsTopology ttt = null;if( !topologyName.isEmpty() ){ttt = new TwitterAnalyticsTopology(topologyName);}else{ttt = new TwitterAnalyticsTopology();}logger.info( "Topology Name: " + ttt.topologyName );if (runLocally){logger.info("Running in local mode");ttt.runLocally();}else{logger.info("Running in remote (cluster) mode");if (runLocally)ttt.runRemotely();}wireTopology() method, which is called from the constructor for TwitterAnalyticsTopology.
As you can see, wiring a topology consists of creating instances of our
components, passing and required setup/configuration data to those
instances, assigning the spouts and bolts to the topology, and defining
the connections between the components. The calls to the shuffleGrouping() method are how we declare the flow of tuples between components.In this example, we pass our Twitter credentials into the TwitterSpout, so it can open a connection to Twitter and receive tweets. Thus,
mentionTargets
is a list of keywords that we specifically want to look for in our
Twitter stream. We pass this list into the TweetToMentionBolt, which we
will examine momentarily.private void wireTopology(){logger.info( "Wiring topology...");String consumerKey = "your_consumer_key";String consumerSecret = "your_consumer_secret";String accessToken = "your_access_token";String accessTokenSecret = "your_token_secret";String[] keywords = {};List<String> mentionTargets = new ArrayList<String>();mentionTargets.add( "facebook" );mentionTargets.add( "tor" );mentionTargets.add( "oracle" );mentionTargets.add( "jive" );mentionTargets.add( "manufacturing" );mentionTargets.add( "openstack" );mentionTargets.add( "barrett" );mentionTargets.add( "hadoop" );mentionTargets.add( "arduino" );mentionTargets.add( "memristor" );mentionTargets.add( "sony" );mentionTargets.add( "scala" );this.builder.setSpout(
"twitterSpout", new TwitterSpout(consumerKey, consumerSecret,
accessToken, accessTokenSecret, keywords ), 1);this.builder.setBolt( "tweetToMentionBolt", new TweetToMentionBolt(mentionTargets), 3 ).shuffleGrouping("twitterSpout");this.builder.setBolt(
"singleMentionAlerterBolt", new SingleMentionAlerterBolt(
"talk.google.com", "5222", "gmail.com" ),
1).shuffleGrouping("tweetToMentionBolt");this.builder.setBolt(
"mentionsThresholdBolt", new MentionsThresholdBolt("talk.google.com",
"5222", "gmail.com"), 1).shuffleGrouping("tweetToMentionBolt");this.builder.setBolt(
"sentimentAnalysisBolt", new SentimentAnalysisBolt( "talk.google.com",
"5222", "gmail.com",
"/home/prhodes/workspaces/storm/storm-samples/AFINN/AFINN-111.txt" ),
3).shuffleGrouping( "tweetToMentionBolt" );}In the TwitterSpout class we take advantage of the
open()
method, which is called by Storm as part of initializing the topology,
to open our connection to Twitter and set up our UserStreamListener
instance. Any tweets that are received are stored in the LinkedBlockingQueue instance, where they are available when Storm initiates a call to the nextTuple() method.public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){logger.info( "TwitterSpout opening...");this.collector = collector;this.queue = new LinkedBlockingQueue<Status>(1000);UserStreamListener listener = new UserStreamListener(){@Overridepublic void onStatus(Status status){logger.info( "onStatus() called");logger.info( "Status: " + status.getUser().getName() + " : " + status.getText() );queue.offer(status);}@Overridepublic void onException(Exception ex){logger.error( "Exception: \n", ex);}};this.twitterStream = new TwitterStreamFactory(new ConfigurationBuilder().setJSONStoreEnabled(true).build()).getInstance();this.twitterStream = new TwitterStreamFactory(this.twitterStream.setOAuthConsumer(consumerKey, consumerSecret);AccessToken token = new AccessToken(accessToken, accessTokenSecret);this.twitterStream.setOAuthAccessToken(token);if (keyWords.length == 0){logger.info( "Sample Twitter Stream");this.twitterStream.user();}else {FilterQuery query = new FilterQuery().track(keyWords);this.twitterStream.filter(query);}}nextTuple() is called, we extract any available tweets from the queue and emit them using the SpoutOutputCollector instance. The order in which arguments are passed to the Values() constructor must match the order in which the output field names are declared in the declareOutputFields() method to allow downstream components to extract fields from the supplied tuple.public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("tweet"));}public void nextTuple(){// logger.info( "Called nextTuple()");Status ret = queue.poll();if (ret == null){// logger.info( "No value to emit...");Utils.sleep(50);}else{// logger.info( "Emitting value...");collector.emit(new Values(ret));}}execute()
method in the next component in the chain -- in our case,
TweetToMentionBolt. This bolt is responsible for examining tweets to see
if they contain one of the keywords we are monitoring for. If one of
our keywords is found, this bolt emits a new tuple with the specified
keyword, the tweet text, and a Date object representing when the mention
occurred. Again, the ordering of fields in the emit() method must match the declaration in the declareOutputFields() method.@Overridepublic void execute(Tuple input, BasicOutputCollector collector){Status status = (Status)input.getValueByField("tweet");// parse the status and look for mentions of the entity we're interested in...String statusText = status.getText();logger.info( "status: " + statusText );for( String mentionTarget : mentionTargets ){if( statusText.toLowerCase().matches( ".*\\s*" + mentionTarget + "\\s+.*" )){logger.info( "emitting metion: " + mentionTarget );collector.emit( new Values( mentionTarget, new Date(), status ) );}else{// NOP}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("mentionTarget", "date", "status" ));}After TweetToMentionBolt emits a tuple, that tuple will be delivered to three separate downstream bolts. SingleMentionAlerterBolt does nothing but send an XMPP message indicating that the specified keyword has been mentioned in a tweet. This could be useful if you were scanning for mentions of a comparatively rare term, but is otherwise here mainly to demonstrate setting up the XMPP integration in a bolt.
MentionsThresholdBolt is a stateful bolt that maintains a list of mentions along with the time the mention occurred, and sends an alert when a keyword is mentioned more than a certain number of times per hour. SentimentAnalysisBolt performs sentiment analysis on the tweets that mention our specified keywords and sends an alert only if the sentiment of the tweet is above or below a specific value.
As you can see below, we use the
prepare() method, which is called by Storm as part of initializing a component, to set up our XMPP connection. Once established by prepare(), we use this object in the execute method to alert on events that match our criteria (depending on which bolt we are in).public class SingleMentionAlerterBolt extends BaseBasicBolt{private static final Logger logger = Logger.getLogger( SingleMentionAlerterBolt.class );private static final long serialVersionUID = 1L;private XMPPConnection xmppConnection;private String chatUsername = "your_gtalk_username";private String chatPassword = "your_gtalk_password";private String xmppHost;private String xmppPort;private String serviceName;Map<String, List<Date>> mentions = new HashMap<String, List<Date>();public SingleMentionAlerterBolt( String xmppHost, String xmppPort, String serviceName ){this.xmppHost = xmppHost;this.xmppPort = xmppPort;this.serviceName = serviceName;}@Overridepublic void prepare(Map stormConf, TopologyContext context){super.prepare(stormConf, context);ConnectionConfiguration cc = new ConnectionConfiguration( xmppHost, Integer.parseInt( xmppPort ), serviceName );this.xmppConnection = new XMPPTCPConnection(cc);try{xmppConnection.connect();xmppConnection.login( chatUsername, chatPassword);}catch (SmackException e){logger.error( "", e );}catch (IOException e){logger.error( "", e );}catch (XMPPException e){logger.error( "", e );}}@Overridepublic void execute(Tuple input, BasicOutputCollector collector){String mentionTarget = input.getStringByField( "mentionTarget" );Date mentionDate = (Date)input.getValueByField("date");logger.info( mentionTarget + " mentioned at: " + mentionDate );// send XMPP alert to user(s)try{Chat chat = ChatManager.getInstanceFor(xmppConnection).createChat( "some_user_of_interest@gmail.com", new MessageListener() {@Overridepublic void processMessage(Chat chat, Message message){// TODO Auto-generated method stub}} );// google bounces back the default message types, you must use chatMessage msgObj = new Message("some_user_of_interest@gmail.com", Message.Type.chat);msgObj.setBody(mentionTarget + " mentioned at: " + mentionDate );chat.sendMessage( msgObj );}catch (SmackException e){logger.error( "", e );}}}mentions. For the purpose of this demo,
we did not implement a cache/restore operation to allow for node
failure, but the basic algorithm would be to write out intermediate
state (to disk, to an in-memory cache on another machine, and so on) in
the execute() method, and look for/load any cached state in the prepare() method.In this example, we drop any mention more than an hour old and recalculate our "mentions per hour" metric each time a new mention occurs. If the new value is above our threshold, we send an XMPP message.
public void execute(Tuple input, BasicOutputCollector collector){// store mentions and calculate a running metric of "mentions / hour"// if the mph exceeds our threshold, send an alert// we could also log it to a database as a point in time snapshot for historical reportingString mentionTarget = input.getStringByField( "mentionTarget" );Date mentionDate = (Date)input.getValueByField("date");logger.info( "MENTIONSTHRESHOLDBOLT: " + mentionTarget + " mentioned at: " + mentionDate );// get the current running mention list for this mentionTarget,// prune anything older than our threshold (one hour for this example)// add the current mention, and sum the mentions in the past hour// if we're over our threshold, send an alert via XMPPSortedSet<Date> mentionsForTarget = mentions.get( mentionTarget );if( mentionsForTarget == null ){mentionsForTarget = new TreeSet<Date>();mentionsForTarget.add(mentionDate);mentions.put(mentionTarget, mentionsForTarget);}else{// we found an existing list, add our new entry and remove anything older than an hourmentionsForTarget.add(mentionDate);Iterator<Date> iter = mentionsForTarget.iterator();// setup a Date instance for one hour agoCalendar now = GregorianCalendar.getInstance();now.add(Calendar.HOUR, -1 );while( iter.hasNext() ){Date d = iter.next();if( d.before(now.getTime())){iter.remove();}else{// we've found a record that is within the hour window, we can stop looking at the others, since this set is sortedbreak;}}int mentionsInPastHour = mentionsForTarget.size();if( mentionsInPastHour > 2 ){// send XMPP alert to user(s)try{Chat chat = ChatManager.getInstanceFor(xmppConnection).createChat( "some_user_of_interest@gmail.com", new MessageListener() {@Overridepublic void processMessage(Chat chat, Message message){// TODO Auto-generated method stub}} );// google bounces back the default message types, you must use chatMessage msgObj = new Message("some_user_of_interest@gmail.com", Message.Type.chat);msgObj.setBody(mentionTarget + " mentioned " + mentionsInPastHour + " times in past hour. " + new Date().toString() );chat.sendMessage( msgObj );}catch (SmackException e){logger.error( "", e );}}}}prepare() method to simply read it from a tab delimited file on the file-system and cache it in a Map instance.public void prepare(Map stormConf, TopologyContext context){super.prepare(stormConf, context);// load the AFINN dataBufferedReader reader = null;try{FileReader fReader = new FileReader( this.afinnPath );reader = new BufferedReader( fReader );String line = null;while( (line = reader.readLine()) != null ){logger.warn( "LINE: " + line );String[] parts = line.split( "\t" );String word = parts[0];System.out.println( "parts[0] = " + parts[0]);Integer score = Integer.parseInt( parts[1] );wordScores.put( word.trim(), score );}}catch( Exception e ){throw new RuntimeException( e );}finally{if( reader != null ){try{reader.close();}catch (IOException e){}}}}public void execute(Tuple input, BasicOutputCollector collector){String mentionTarget = input.getStringByField( "mentionTarget" );Date mentionDate = (Date)input.getValueByField("date");Status status = (Status)input.getValueByField( "status" );logger.warn( "SENTIMENTANALYSIS: " + mentionTarget + " mentioned at: " + mentionDate );String text = status.getText();String[] words = text.split( " " );double sentimentScore = 0.0;for( String word : words ){System.out.println( "evaluating word: " + word );Integer tempScore = wordScores.get( word.trim());if( tempScore != null ){System.out.println( "sentiment value for word (" + word + ") is " + tempScore );sentimentScore += tempScore.intValue();}}sentimentScore = ( sentimentScore / words.length);System.out.println( "final sentiment score: " + sentimentScore );if( sentimentScore >= 0.5 || sentimentScore <= -0.5 ){try{Chat chat = ChatManager.getInstanceFor(xmppConnection).createChat( "some_user_of_interest@gmail.com", new MessageListener() {@Overridepublic void processMessage(Chat chat, Message message){}} );// google bounces back the default message types, you must use chatMessage msgObj = new Message("some_user_of_interest@gmail.com", Message.Type.chat);msgObj.setBody(mentionTarget
+ " mentioned at: " + mentionDate + ", with sentimentScore : " +
sentimentScore + "( tweet id: " + status.getId() + ")" );chat.sendMessage( msgObj );}catch (SmackException e){logger.error( "", e );}}}After the deluge
This application, while admittedly simple, uses all of the primary concepts of Storm. If you've followed along to this point, you'll have all of the basic tools needed to build an interactive Storm analytics solution. You can, of course, go much further than we had space for here. Other items you might want to include in a more elaborate production system:-
Run on a real cluster. Codewise, the demo code above
is ready to run on an actual Storm cluster. To run on a real cluster,
build a cluster following the official Storm documentation, configure
your local Storm client to point to the Nimbus server of the cluster,
and add the "remote" keyword to the end of the launcher command.
-
Use pre-existing off-the-shelf libraries for common numerical/statistical calculations. We
used the most basic metric calculations here, but you are limited only
by your imagination (and your requirements) when deciding what metrics
you want to calculate. Also, the plethora of existing libraries for
statistical/numerical calculations opens many vistas for your
exploration.
-
Use standard machine learning packages for clustering, classification, and other operations.
As with statistics and numerical calculations, machine learning
libraries are legion, and many high-quality libraries are open source
and freely available. The repository at mloss.org maintains an amazing list of the latest in open source machine learning software.
-
Front-end Storm with Apache Kafka. We didn't need
Kafka for this small-scale demo, but this is a very powerful
combination. If you have a very high volume of messages that need to be
processed, using Kafka for pre-processing and partitioning in front of
Storm works very well.
-
Use R with Storm. Spouts and bolts can be written in
almost any language, but for complex statistical analytics R is a
purpose-built language with an incredible array of available libraries. R
is the de facto software tool used by statisticians and can be an
important arrow to have in your own quiver.
-
Integrate with a database or with Hadoop. While a
Storm topology is running, it can easily persist intermediate values to
an RDBMS, Hadoop, or some other data store to support longer-term
historical analysis and reporting. Storm can also easily consume data
from Hadoop or from an RDBMS.
Thanks for sharing this informative information..
ReplyDeleteFor Storm components all over information you may also refer.... http://www.s4techno.com/blog/2016/08/13/storm-components/