Don’t be afraid of the Storm (Part 2)

This is the second part of the series on learning Apache Storm which includes code example. Here’s the link to Part 1 which has introduction.

I will be using Apache Storm 1.0.2, Kafka, Java 8, Gradle, IntelliJ IDE and Mac. I am assuming that you already have knowledge about Java, Gradle and IDE.


In this example we want to calculate number of repetition of different words in a stream of sentences. To achieve that we will need to do following tasks:

  1. Read sentence from Kafka
  2. Split sentence into Map of word and count
  3. Update the main Map with the new values.

I don’t have to tell you how to implement this in a normal java application. This is a very simple problem to solve. But what if i tell you that you have to do all these tasks by using parallelism concept? It will become a trivial problem to solve.

That’s where Storm come’s into picture. It standardise the code structure in such a way that we just have to code our tasks in bolts and it will take care of running in parallely. We just needs to break down the problem into small tasks.

So without wasting anymore time, lets get to the example


First create a new project by in IntelliJ and select Gradle from left pane and Java in right. Add GroupId and ArtifactId as per your need. Set you project destination as needed.

I am using “com.article” as group id and “word-counter” as artifact id.

After the project is created, open “build.gradle”. IDE must have added some default values in that file. In “dependencies” add apache storm’s dependency by adding following lines.

compile 'org.apache.storm:storm-core:1.0.2', {

   ext {

       fatJarExclude = true

   }

}

“ext” block tells gradle to not include storm-core into the final jar. We will only need this to test our code locally but we don’t need during submission to storm.

Add these dependencies also

#kafka client for integrating with storm

compile('org.apache.storm:storm-kafka:1.0.2')

compile('org.apache.storm:storm-kafka-client:1.2.1')

compile('com.fasterxml.jackson.core:jackson-databind:2.7.2')

Now create folder hierarchy of src — >main — >java in the base folder. Add package named “com.article.word_count”.

Create another folder under “main” named “resources” and create a property file named “application.properties”. This will have necessary properties for our code.

Create new class “WordCountTopology” in the base package and add “public static void main” method. This will become our starter class. In this class we will construct our topology and submit it to storm.

Let’s look at the extract from the code( Click here to see full code)

public class WordCountTopology {

 public static void main(String[] args)

     throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {

#1  Properties properties = getResourceProperties();

#2    String kafkaBootstrapServer =  properties.getProperty("kafka.bootstrap.server");

     String topicNameInput =   properties.getProperty("kafka.topic.input");

     String topicNameOutput = properties.getProperty("kafka.topic.output");

  //kafka spout configuration


#3  KafkaSpoutConfig<String,String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaBootstrapServer,topicNameInput).build();

#4  KafkaSpout<String,String> kafkaSpoutInput = new KafkaSpout<>(kafkaSpoutConfig);

  //Start building topology


#5  TopologyBuilder builder = new TopologyBuilder();

  //This will add spout to topology i.e. Kafka reader


#6  builder.setSpout("kafka-spout", kafkaSpoutInput, 1);

  //This will add bolt to topology which read output from spout


#7 builder.setBolt("sentence-splitter",new SentenceSplitterBolt(),1)

#8        .shuffleGrouping("kafka-spout");

#9 builder.setBolt("word-counter", new WordCounterBolt(), 1)

          .shuffleGrouping("sentence-splitter");

#10 KafkaBolt outputBolt = new KafkaBolt()

     .withProducerProperties(readKafkaProperties(properties))

     .withTopicSelector(new DefaultTopicSelector(topicNameOutput))

   .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

#11 builder.setBolt("output-bolt" , outputBolt , 1)

          .shuffleGrouping("word-counter");

#12 Config config = new Config();

   config.setDebug(false);

   config.setMessageTimeoutSecs(30);

   config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1000);

   config.put(Config.TOPOLOGY_ACKER_EXECUTORS,1);

#13    StormSubmitter.submitTopology(

          "WordCountTopology",

          config,

          builder.createTopology());

 }

}

Let’s go through the code :

  1. We are reading properties from property file
  2. We are assigning kafka and zookeeper property values to constants
  3. Storm developers has already created Spout and Bolt which can be used to connect to Kafka. We just need to initialise them configuration. So here, we are creating the KafkaSpoutConfig with Kafka server, input topic. Typecasting specify the input and output format of that spout.
  4. We are creating Kafka spout with configuration
  5. We are initialising TopologyBuilder. This will be used to construct our topology. By using this object we will setup bolt and spout arrangement.
  6. We are setting up the spout for topology. Lets look at the arguments of “setSpout”:
    1. first argument specify unique name of the
    2. second argument specify object of spout
    3. third argument specify how many spouts should run in parallel
  7. We are setting up Sentence Splitter bolt which will convert the incoming sentence to Map of word and count. Method argument of “setBolt” is same as “setSpout”.
  8. The difference comes here. After setting up the “bolt” we will need to specify how this bolt will get its messages i.e how data will flow into the bolt. For that we will need to specify name of the spout/bolt from which this bolt will read the messages. Here we have specified “kafka-spout” (name of our spout) as the feeder by calling “shuffleGrouping” method(We will discuss different types of grouping in later posts)
  9. Here we are adding next bolt which will accumulate incoming Map of word and count and pass it into the next bolt as final result. This will have the total count of each word which came during the time our code was running. This bolt will read messages from “sentence-splitter” bolt.
  10. Here we are reading the properties of kafka to create configuration of Kafka Bolt.
  11. This bolt will read messages from “word-counter” bolt.
  12. We are creating Config object to be used to set storm properties. These are some properties which I generally used. You can read more about these in apache documentation.
  13. Another usecase of this config is to send your custom properties to bolts e.g. db url, redis-node info etc.
    Note: This can be empty also if u want to use storm’s default properties.
    Here we are submitting the topology to storm. Let’s look at the arguments:

    1. first we need to provide unique name for a topology
    2. second we need to provide storm configuration
    3. third we need to provide our topology object

Let’s look at the code for “sentence-splitter” bolt. I won’t bore you with business logic but will provide you details which will be needed to create a bolt.

public class SentenceSplitterBolt extends BaseRichBolt {

OutputCollector collector;


@Override

 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

   this.collector = collector;

 }

@Override

 public void execute(Tuple input) {

   String sentence = input.getString(0);

   if (sentence != null) {

     System.out.println("Sentence: " + sentence);

     String[] wordArray = sentence.split(" ");

     Map<String, Integer> wordMap = new HashMap<>();

     for (String word : wordArray) {

       Integer count = wordMap.get(word);

       if (count == null) {

         count = 0;

       }

       count++;

       wordMap.put(word, count);

     }

     // send the constructed Map to next bolt

     collector.emit(new Values(wordMap));

   }

   // acknowledge that the processing of this tuple is finished

   collector.ack(input);

 }

@Override

 public void declareOutputFields(OutputFieldsDeclarer declarer) {

   declarer.declare(new Fields("wordMap"));

 }

}

To create a bolt class you can do two things:

  1. You can implement the bolt interface “IRichBolt”
  2. You can extend the “BaseRichBolt”

I am going with option 2 as it takes care of implementing methods which we don’t generally customize.

You will then need to Override three methods:

  1. prepare

This method will be called once in bolt’s lifetime. First parameter has the properties which is sent during creation of topology. Second parameter is the topology context and third is OutputCollector. OutputCollector will be used to acknowledging the processing of a tuple and sending the data to next tuple.

  1. execute

This will be called each time a new tuple(i.e. message) is queued for this bolt. This is the method where will need to do our processing. We also need to do two more things in this method:

    1. acknowledge the tuple that came into this method. This notify storm that this tuple is successfully processed.
    2. emit(i.e. send) the data to next bolt if needed. We can do that by sending “Values” object. Values constructor takes variable number of “object” arguments. Note: we need to send the values in same order as we have defined in the schema in “declareOutputFields”
  1. declareOutputFields

This method will be called to know the format in which this bolt will transmit the data. We will need to use the passed object of OutputFieldsDeclarer to declare the fields. We can do that by creating “Fields” object. Fields constructor takes variable number of “string” arguments which will define the schema.

Note: We will need to send the values in same order as we are declaring them.


To test your code locally, you will need to replace #13 with this

LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("WordCountTopology",new Config(),builder.createTopology())

This will create a local cluster of storm for you at runtime and submit the topology to it. That way you can test your code easily before submitting it to remote server.

After you have test your code, you will have to create your jar which you will submit to storm. I have added plugin “fatjar” in gradle file which i will be using to create that jar by executing this command in source folder of our project

gradle build fatjar

Before submitting the topology, you will need to specify “nimbus” host address in “storm.yaml” present in “conf” folder of your local apache storm directory. You can connect submit your topology by running the command locally by specifying nimbus host.

You can do that by adding this line to the config

nimbus.seeds: ["{ip-address}"]

Now go to the “bin” folder of your apache storm directory and execute following command

   #1    #2         #3                                                   #4

storm jar {path of jar} com.article.word_count.WordCountTopology

#1 “storm” is a script present in bin folder which take care of uploading our topology to storm cluster.

#2 “jar” is an argument which tells the script that we want to upload the topology

#3 we need to provide path of the jar file

#4 we need to specify classpath of our “main” class which takes care of building topology


I hope by reading this article you will be able to create your own topology and run it on storm. If you have any questions/suggestions, please comment and I will revert.

In the next part we will discuss the challenges in increasing the parallelism hint and how can we overcome them by using different type of “grouping”. We will also discuss about how multiple bolts can use the same bolt’s data in their feed.

Click here to read Part 3

By 

Tiwan Punit

www.coviam.com

 


Also published on Medium.

2 thoughts on “Don’t be afraid of the Storm (Part 2)

Leave a Reply

Your email address will not be published.