Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to realize stand-alone installation of storm

2025-04-07 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

Editor to share with you how to achieve stand-alone installation of storm, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's go to know it!

Storm is a distributed and highly fault-tolerant real-time computing system.

The meaning of Storm for real-time computing is equivalent to that of Hadoop for batch processing. Hadoop provides us with Map and Reduce primitives, which makes it very simple and elegant for us to batch data. Similarly, Storm also provides simple Spout and Bolt primitives for real-time computing of data.

Scenarios applicable to Storm:

1. Stream data processing: Storm can be used to process a steady stream of messages and save the processed results to a persistent medium.

2. Distributed RPC: because the processing components of Storm are distributed and the processing latency is very low, Storm can be used as a general distributed RPC framework.

In this tutorial we will learn how to create Topologies and deploy topologies to the storm cluster. Java will be our main demonstration language, and individual examples will use python to demonstrate the multilingual features of storm.

1. Preparatory work

This tutorial uses examples from the storm-starter project. I recommend that you download the code for this project and do it with the tutorial. Read these two articles: configuring the storm development environment and creating a new strom project to set up your machine.

2. Basic components of a Storm cluster

On the surface, storm's cluster looks very similar to hadoop's cluster. But on Hadoop you run MapReduce's Job, and on Storm you run Topology. They are very different-one key difference is that a MapReduce Job eventually ends and a Topology operation runs forever (unless you explicitly kill it).

There are two types of nodes in the Storm cluster: control node (master node) and work node (worker node). A daemon runs on the control node: Nimbus, which acts like JobTracker in Hadoop. Nimbus is responsible for distributing code across the cluster, assigning work to machines, and monitoring status.

Each worker node runs a node called Supervisor (similar to TaskTracker). Supervisor monitors the work assigned to its machine and starts / shuts down the worker process as needed. Each worker process executes a subset of a Topology (like Job); a running Topology consists of many worker processes Worker (like Child) running on many machines.

Storm topology structure

Bolt can receive any number of input stream for some processing, and some bolt may also emit some new stream. Some complex flow switching, such as figuring out hot topics from some tweet, requires multiple steps, which requires multiple bolt. Bolt can do anything: run functions, filter tuple, do some aggregations, do some merges, access databases, and so on.

Bolt processes the input Stream and produces a new output Stream. Bolt can perform filtering, function operations, Join, database manipulation, and so on. Bolt is a passive role with an execute (Tuple input) method in its interface, which is called after receiving a message, in which users can execute their own processing logic.

Topology structure

Every node in topology runs in parallel. In your topology, you can specify the parallelism of each node, and storm allocates so many threads in the cluster to calculate at the same time.

A topology will run until you explicitly stop it. Storm automatically reassigns tasks that fail, and storm ensures that you won't lose any data, even if some machines shut down unexpectedly and messages are dropped.

5. Data model (Data Model)

Storm uses tuple as its data model. Each tuple is a pile of values, each value has a name, and each value can be of any type. In my understanding, a tuple can be seen as a java object with no methods. Overall, storm supports all basic types, strings, and byte arrays as value types for tuple. You can also use your own defined types as value types, as long as you implement the corresponding serializer (serializer).

A Tuple represents a basic processing unit in a data flow, such as an cookie log, which can contain multiple Field, each Field representing an attribute.

Tuple is supposed to be a Map of Key-Value, but since the field names of the tuple passed between components have been defined in advance, the Tuple only needs to fill in each Value in order, so it is a Value List.

An unbounded, continuous, continuous Tuple sequence makes up Stream.

Each node in the topology must define each field of the tuple it wants to emit. For example, the following bolt defines that the tuple it emits contains two fields of type double and triple.

PublicclassDoubleAndTripleBoltimplementsIRichBolt {privateOutputCollectorBase _ collector; @ Override publicvoidprepare (Map conf, TopologyContext context, OutputCollectorBase collector) {_ collector = collector;} @ Override publicvoidexecute (Tuple input) {intval = input.getInteger (0); _ collector.emit (input,newValues (val*2, val*3)); _ collector.ack (input) } @ Override publicvoidcleanup () {} @ Override publicvoiddeclareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (newFields ("double", "triple"));}}

The declareOutputFields method defines the fields to output: ["double", "triple"]. We'll explain the rest of this bolt later.

6. A simple Topology

Let's look at a simple example of topology. Let's take a look at the ExclamationTopology in storm-starter:

TopologyBuilder builder = newTopologyBuilder (); builder.setSpout (1MagneNewTestWordSpout (), 10); builder.setBolt (2MagneNewExclamationBolt (), 3) .shuffleGrouping (1); builder.setBolt (3MakenewExclamationBolt (), 2) .shuffleGrouping (2)

This Topology contains a Spout and two Bolt. Spout emits words, and each bolt adds a "!!" after each word. The three nodes are lined up: the spout sends the word to the first bolt, and the first bolt then sends the processed word to the second bolt. If the words spout emits are ["bob"] and ["john"], then the second bolt launches ["bolt!"] And ["john!"] come out.

We use setSpout and setBolt to define nodes in Topology. These methods receive an id we specify, an object that contains the processing logic (spout or bolt), and the degree of parallelism you need.

If the object that contains the processing is spout, then implement the interface of IRichSpout, if it is bolt, then implement the interface of IRichBolt.

The last parameter that specifies the degree of parallelism is optional. It indicates how many thread are needed in the cluster to execute this node together. If you ignore it, storm allocates a thread to execute the node.

The setBolt method returns an InputDeclarer object that defines the input to the Bolt. Here the first Bolt declares that it wants to read all the tuple emitted by spout-using shuffle grouping. The second bolt declares that it reads the tuple emitted by the first bolt. Shuffle grouping indicates that all tuple will be randomly distributed to all task of bolt. There are many strategies for distributing tuple to task, which will be described later.

If you want the second bolt to read all the tuple emitted by the spout and the first bolt, then you should define the second bolt as follows:

Builder.setBolt (3MagneNewExclamationBolt (), 5) .shuffleGrouping (1) .shuffleGrouping (2)

Let's take a closer look at how spout and bolt are implemented in this topology. Spout is responsible for launching a new tuple into this topology. TestWordSpout launches a random word from ["nathan", "mike", "jackson", "golda", "bertels". The nextTuple () method in TestWordSpout is defined as this:

PublicvoidnextTuple () {Utils.sleep (100); finalString [] words = newString [] {"nathan", "mike", "jackson", "golda", "bertels"}; finalRandom rand = newRandom (); finalString word = word [Rand.nextInt (words.length)]; _ collector.emit (newValues (word));}

As you can see, the implementation is very simple.

ExclamationBolt put "!" Spliced to the back of the input tuple. Let's take a look at the complete implementation of ExclamationBolt.

PublicstaticclassExclamationBoltimplementsIRichBolt {OutputCollector _ collector; publicvoidprepare (Map conf, TopologyContext context, OutputCollector collector) {_ collector = collector;} publicvoidexecute (Tuple tuple) {_ collector.emit (tuple,newValues (tuple.getString (0) + "!!")); _ collector.ack (tuple) } publicvoidcleanup () {} publicvoiddeclareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (newFields ("word"));}}

The prepare method provides bolt with an Outputcollector to emit tuple. Bolt can fire tuple at any time-in prepare, execute, or cleanup methods, or even asynchronously in another thread. Here the prepare method simply saves OutputCollector as a class field for later execute methods to use.

The execute method receives the tuple from one input to the bolt (a bolt may have multiple input sources). ExclamationBolt gets the first field of tuple, adding "!!" And then launch it. If a bolt has more than one input source, you can know which input source it comes from by calling the Tuple#getSourceComponent method.

There are other things worth mentioning in the execute method: the input tuple is taken as the first parameter of the emit method, and the input tuple is ack on the last line. These are all part of the Storm reliability API, which will be explained later.

The cleanup method is called when bolt is closed, and it should clean up all open resources. But the cluster does not guarantee that this method will be executed. For example, if the machine executing task down is dropped, there is no way to call that method at all. Cleanup is designed to be called during local mode (that is, to simulate the entire storm cluster in one process), and you want to avoid resource leaks when shutting down some topology.

Finally, declareOutputFields defines the tuple of a field called "word".

Run ExclamationTopology as local mode

Let's see how to run ExclamationToplogy as local mode.

There are two modes of storm operation: local mode and distributed mode. In local mode, storm uses threads in a process to simulate all spout and bolt. Local patterns are useful for development and testing. When you run the topology in storm-starter, they run in local mode, and you can see what messages are being sent by each component of the topology.

In distributed mode, storm consists of a bunch of machines. When you submit topology to master, you also submit the topology code. Master is responsible for distributing your code and assigning work processes to your topolgoy. If a worker process fails, the master node will reassign the thought to another node. For information about how to run topology on a cluster, you can take a look at the Running topologies on a production cluster article.

Here is the code to run ExclamationTopology in local mode:

Config conf = newConfig (); conf.setDebug (true); conf.setNumWorkers (2); LocalCluster cluster = newLocalCluster (); cluster.submitTopology ("test", conf, builder.createTopology ()); Utils.sleep (10000); cluster.killTopology ("test"); cluster.shutdown ()

First, the code definition defines a cluster within a process by defining a LocalCluster object. Submitting topology to the virtual cluster is the same as submitting topology to the distributed cluster. The topology is submitted by calling the submitTopology method, which accepts three parameters: the name of the topology to run, a configuration object, and the topology itself to run.

The name topology is used to uniquely distinguish a topology, so you can then use that name to kill the topology. As mentioned earlier, you must explicitly kill a topology, or it will run all the time.

The Conf object can be configured with many things, and the following two are the most common:

TOPOLOGY_WORKERS (setNumWorkers) defines how many worker processes you want the cluster to assign to you to execute the topology. Each component in the topology will be executed by a thread. Exactly how many threads each component uses is specified through setBolt and setSpout. These threads are running in the working process. Each worker process contains some worker threads of some nodes. For example, if you specify 300 threads and 60 processes, then 6 threads will be executed in each worker process, and these 6 threads may belong to different components (Spout, Bolt). You can adjust the performance of topology by adjusting the parallelism of each component and the number of processes in which these threads reside.

TOPOLOGY_DEBUG (setDebug), when it is set to true, storm records every message emitted by each component. This is useful in debugging topology in the local environment, but doing so online can affect performance.

If you are interested, you can look at the Javadoc of the Conf object to see all the configurations of topology.

See creating a new storm project to see how to configure the development environment so that you can run topology in local mode.

The running Topology mainly consists of the following three components:

Worker processes (process)

Executors (threads) (thread)

Tasks

The number of Task of Spout or Bolt cannot be changed once specified, and the number of Executor can be adjusted dynamically according to the situation. By default, # executor = # tasks means that there is a Task running in an Executor

1 worker process executes a subset of 1 topology (Note: there will not be 1 worker for multiple topology services). A worker process starts one or more executor threads to execute a topology's component (spout or bolt). Therefore, a running topology is made up of multiple worker processes on multiple physical machines in the cluster.

Executor is a separate thread started by the worker process. Each executor will only run task (set) of 1 component (spout or bolt) of 1 topology (Note: task can be one or more, storm defaults to 1 component generating only 1 task,executor thread and all task instances are called sequentially in each loop).

The task is the unit that ends up running the code in spout or bolt (Note: 1 task is an instance of spout or bolt, and the executor thread calls the nextTuple or execute method of that task during execution). After topology starts, the number of task of a component (spout or bolt) is fixed, but the number of executor threads used by the component can be dynamically adjusted (for example, 1 executor thread can execute one or more task instances of the component). This means that there is a condition for a component: for a mapping of # threads times, every time WordCount receives a word, it updates the statistical status in memory.

There are several different stream grouping types:

The simplest grouping is shuffle grouping, which is randomly sent to any task. In the above example, shuffle grouping is used between RandomSentenceSpout and SplitSentence, and shuffle grouping distributes the tuple of each task evenly.

A more interesting grouping is fields grouping, and fields grouping is used between SplitSentence and WordCount. This grouping mechanism ensures that tuple with the same field value will go to the same task, which is critical for WordCount. If the same word does not go to the same task, then the counted number of words is not correct.

Fields grouping is the basis for stream merging, stream aggregation, and many other scenarios. At the back, fields grouping uses a consistent hash to allocate tuple.

There are also some other types of stream grouping. You can learn more about it in the Concepts chapter.

Here are some common "routing" mechanisms:

The Grouping of Storm is the Partition mechanism of the message. When a Tuple is sent, how can you be sure to send it to some (or some) Task for processing?

L ShuffleGrouping: randomly select a Task to send.

L FiledGrouping: do consistency hash according to Fields in Tuple, and Tuple with the same hash value is sent to the same Task.

L AllGrouping: broadcast, sending each Tuple to all Task.

L GlobalGrouping: all Tuple will be sent to the Task with the smallest id in a Bolt.

L NoneGrouping: doesn't care which Task the Tuple is sent to for processing, which is equivalent to ShuffleGrouping.

L DirectGrouping: send the Tuple directly to the specified Task for processing.

8. Define Bolt in another language

Bolt can be defined in any language. Bolt defined in other languages is executed as a child process (subprocess), and storm uses JSON messages to communicate with these subprocess through stdin/stdout. The communication protocol is a 100-line library for which the storm team developed corresponding Ruby, Python and Fancy versions.

Here is the definition of SplitSentence in WordCountTopology:

PublicstaticclassSplitSentenceextendsShellBoltimplementsIRichBolt {publicSplitSentence () {super ("python", "splitsentence.py");} publicvoiddeclareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (newFields ("word"));}}

SplitSentence inherits from ShellBolt and declares that the Bolt runs with python, and the parameter is: splitsentence.py. Here is the definition of splitsentence.py:

Importstorm classSplitSentenceBolt (storm.BasicBolt): defprocess (self, tup): words=tup.values [0] .split (") forwordinwords: storm.emit ([word]) SplitSentenceBolt () .run ()

For more information about defining Spout and Bolt in other languages, and creating topology in other languages, please see: Using non-JVM languages with Storm.

9. Reliable message processing

Earlier in this tutorial, we skipped some features about tuple. These characteristics are the reliability of storm API: how storm ensures that every tuple issued by spout is fully processed. Take a look at "how storm ensures that messages are not lost" to learn more about storm's reliability API.

Storm allows users to specify a MessageId when launching a new source Tuple in Spout, which can be any Object object. Multiple source Tuple can share the same MessageId, indicating that the multiple source Tuple are the same message unit for the user. The reliability of Storm means that Storm tells the user whether each message unit is fully processed within a specified period of time. Full processing means that the source Tuple bound to the MessageId and all Tuple derived from the source Tuple are processed by every Bolt that should arrive in the Topology.

The ack mechanism means that every message sent by spout

Within a specified period of time, spout receives an ack response from Acker, that is, it is considered that the tuple has been successfully processed by subsequent bolt.

If the ack response tuple of Acker is not received within the specified time, the fail action is triggered, that is, the tuple processing is considered to have failed

Or receive the fail response tuple sent by Acker, which is also considered to have failed, and trigger the fail action.

In addition, Ack mechanism is also often used to limit current: in order to avoid spout sending data too fast and bolt processing too slow, we often set the number of pending. When the spout has tuple equal to or more than the number of pending does not receive ack or fail response, skip the execution of nextTuple, thus restricting spout to send data.

Set the number of spout pend through conf.put (Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);.

Tuple1 and tuple2 bound by message 1 in Spout are processed by bolt1 and bolt2 respectively, and then two new Tuple are generated and eventually flow to bolt3. When the bolt3 is finished, it is said that message 1 has been completely processed.

Each Topology in Storm contains an Acker component. The task of the Acker component is to track the processing of all Tuple in the Tuple tree bound by each messageId that flows out of the Spout. If the Tuple is not fully processed within the maximum timeout set by the user, Acker tells Spout that the message processing failed, and instead tells Spout that the message processing was successful.

So how does Acker record the processing results of Tuple?

A xor A = 0.

A xor B... Xor B xor A = 0, where each Operand appears only twice.

In Spout, the Storm system generates a corresponding 64-bit integer for the user-specified MessageId, which serves as the RootId for the entire Tuple Tree. The RootId is passed to the Acker and subsequent Bolt as the unique identity of the message unit. At the same time, whether Spout or Bolt, each time a new Tuple is generated, the Tuple is given a unique 64-bit integer Id.

When Spout finishes transmitting the source Tuple corresponding to a MessageId, it tells Acker about the RootId it emits and the Id of those source Tuple generated. When Bolt finishes processing an input Tuple and generates a new Tuple, it also tells Acker the Id of the input Tuple it processes and the Id of the newly generated Tuple. Acker only needs to perform XOR operations on these Id to determine whether the message unit corresponding to the RootId has been successfully processed.

How to use the Ack mechanism

Spout takes msgid when sending data.

Set the number of acker to be at least greater than 0 position Config.setNumAckers (conf, ackerParal)

When you finish processing tuple in bolt, execute OutputCollector.ack (tuple), and execute OutputCollector.fail (tuple) when you fail; * * it is recommended to use IBasicBolt because IBasicBolt automatically encapsulates OutputCollector.ack (tuple). If processing fails, throw FailedException, then OutputCollector.fail (tuple) is automatically executed.

How to turn off the Ack mechanism

There are two ways

Spout sends data without msgid.

Set the number of acker to 0

10. Stand-alone installation guide

Environment: centos 6.4

For installation steps, please refer to: http://blog.sina.com.cn/s/blog_546abd9f0101cce8.html

Note that running WordCount in local mode above does not actually use the tools installed above, just testing demo in a virtual environment of storm. So how do we run the program in the stand-alone environment we just built?

Quite simply, the official example:

Note that in the official instance, if the WordCountTopology class does not take parameters, it is actually the local mode of execution, that is, the virtual environment mentioned earlier. With parameters, the jar is sent to storm for execution.

First of all, get the environment right:

Start zookeeper:

The stand-alone version of usr/local/zookeeper/bin/zkServer.sh starts directly, and there is no need to modify the configuration. For example, if the cluster needs to be modified, zoo.cfg will be discussed in another article.

Configure storm:

The file is in / usr/local/storm/conf/storm.yaml

Content:

Storm.zookeeper.servers:

-127.0.0.1

Storm.zookeeper.port: 2181

Nimbus.host: "127.0.0.1"

Storm.local.dir: "/ tmp/storm"

Supervisor.slots.ports:

-6700

-6701

-6702

-6703

This script file is not well written, so be sure to add a space at the beginning of each item and a space after the colon, otherwise storm will not recognize the configuration file.

To explain: storm.local.dir represents the local directory that storm needs to use. Nimbus.host indicates that that machine is a master machine, or nimbus. Storm.zookeeper.servers indicates which machines are zookeeper servers. Storm.zookeeper.port represents the port number of zookeeper. It must be the same as the port number configured by zookeeper, otherwise there will be a communication error. Keep in mind. Of course, you can also use superevisor.slot.port,supervisor.slots.ports to represent the number of slots of supervisor nodes, that is, you can run up to several worker processes (each sprout or bolt starts only one worker by default, but can be modified to multiple via conf).

Execute:

# bin/storm nimbus (start the master node)

# bin/storm supervisor (launch slave node)

Execute the command: # storm jar StormStarter.jar storm.starter.WordCountTopology test

The purpose of this command is to use storm to send jar to storm for execution, followed by the test is the defined toplogy name.

When it is done, the task will be sent to storm to run, and you can also use the command:

# bin/storm ui

Then execute jps and you will see three processes: zookeeper, nimbus, and supervisor

When you start ui, you can check the running status of I through the browser and ip:8080/.

After configuration, execute storm jar sm.jar main.java.TopologyMain words.txt

Maybe it will say: java.lang.NoClassDefFoundError: clojure.core.protocols$seq_reduce

This is due to the fact that I use oracle JDK 1.7.But open JDK 1.6would be normal.

Su-c "yum install java-1.6.0-openjdk-devel"

Specific reference: https://github.com/technomancy/leiningen/issues/676

Test the code:

Https://github.com/storm-book/examples-ch02-getting_started

Running result:

Storm jar sm.jar main.java.TopologyMain words.txt... 6020 [main] INFO backtype.storm.messaging.loader-Shutdown receiving-thread: [Getting-Started-Toplogie-1-1374946750 4] 6020 [main] INFO backtype.storm.daemon.worker-Shut down receive thread6020 [main] INFO backtype.storm.daemon.worker-Terminating zmq context6020 [main] INFO backtype.storm.daemon.worker-Shutting down executorsOK:is6021 [main] INFO backtype.storm.daemon.executor-Shutting down executor word-counter: [22] OK:anOK:stormOK:simple6023 [Thread-16] INFO backtype.storm.util-Async loop the OKapplicationOKlV: Great6038 [Thread-15] INFO backtype.storm.util-Async loop copyright-Word Counter [word-counter-2]-- really: 1but: 1application: 1is: 2great: 2are: 1test: 1simple: 1an: 1powerfull: 1storm: 3very: 16043 [main] INFO backtype.storm.daemon.executor-Shut down executor word-counter: [22] 6044 [main] INFO backtype.storm.daemon.executor-Shutting down executor word-normalizer: [3 3] 6045 [Thread-18] INFO backtype.storm.util- Async loop INFO backtype.storm.daemon.executor-Shut down executor word-normalizer: [3 3] 6056 [main] INFO backtype.storm.daemon.executor-Shutting down executor word-reader: [4 4] 6058 [Thread-19] INFO backtype.storm.util-Async loop... The above is all the contents of the article "how to achieve stand-alone installation of storm". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report