µ±Ç°Î»ÖÃ: > Linux¼¯Èº > Hadoop >

Hadoop storm֪ʶ

ʱ¼ä:2014-12-05 17:55À´Ô´:www.it.net.cn ×÷Õß:IT

Ò»¡¢Storm»ù±¾¸ÅÄî

ÔÚÔËÐÐÒ»¸öStormÈÎÎñ֮ǰ£¬ÐèÒªÁ˽âһЩ¸ÅÄ

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers
  9. Configuration

Storm¼¯ÈººÍHadoop¼¯Èº±íÃæÉÏ¿´ºÜÀàËÆ¡£µ«ÊÇHadoopÉÏÔËÐеÄÊÇMapReduce jobs£¬¶øÔÚStormÉÏÔËÐеÄÊÇÍØÆË£¨topology£©£¬ÕâÁ½ÕßÖ®¼äÊǷdz£²»Ò»ÑùµÄ¡£Ò»¸ö¹Ø¼üµÄÇø±ðÊÇ£º Ò»¸öMapReduce job×îÖÕ»á½áÊø£¬ ¶øÒ»¸ötopologyÓÀÔ¶»áÔËÐУ¨³ý·ÇÄãÊÖ¶¯killµô£©¡£

ÔÚStormµÄ¼¯ÈºÀïÃæÓÐÁ½Öֽڵ㣺 ¿ØÖƽڵ㣨master node£©ºÍ¹¤×÷½Úµã£¨worker node£©¡£¿ØÖƽڵãÉÏÃæÔËÐÐÒ»¸ö½ÐNimbusºǫ́³ÌÐò£¬ËüµÄ×÷ÓÃÀàËÆHadoopÀïÃæµÄJobTracker¡£Nimbus¸ºÔðÔÚ¼¯ÈºÀïÃæ·Ö·¢´úÂ룬·ÖÅä¼ÆËãÈÎÎñ¸ø»úÆ÷£¬ ²¢ÇÒ¼à¿Ø״̬¡£

ÿһ¸ö¹¤×÷½ÚµãÉÏÃæÔËÐÐÒ»¸ö½Ð×öSupervisorµÄ½Úµã¡£Supervisor»á¼àÌý·ÖÅä¸øËüÄÇ̨»úÆ÷µÄ¹¤×÷£¬¸ù¾ÝÐèÒªÆô¶¯/¹Ø±Õ¹¤×÷½ø³Ì¡£Ã¿Ò»¸ö¹¤×÷½ø³ÌÖ´ÐÐÒ»¸ötopologyµÄÒ»¸ö×Ó¼¯£»Ò»¸öÔËÐеÄtopologyÓÉÔËÐÐÔںܶà»úÆ÷Éϵĺܶ๤×÷½ø³Ì×é³É¡£ 

NimbusºÍSupervisorÖ®¼äµÄËùÓÐЭµ÷¹¤×÷¶¼ÊÇͨ¹ýZookeeper¼¯ÈºÍê³É¡£ÁíÍ⣬Nimbus½ø³ÌºÍSupervisor½ø³Ì¶¼ÊÇ¿ìËÙʧ°Ü£¨fail-fast)ºÍÎÞ״̬µÄ¡£ËùÓеÄ״̬ҪôÔÚzookeeperÀïÃ棬 ҪôÔÚ±¾µØ´ÅÅÌÉÏ¡£ÕâÒ²¾ÍÒâζ×ÅÄã¿ÉÒÔÓÃkill -9À´É±ËÀNimbusºÍSupervisor½ø³Ì£¬ È»ºóÔÙÖØÆôËüÃÇ£¬¾ÍºÃÏñʲô¶¼Ã»Óз¢Éú¹ý¡£Õâ¸öÉè¼ÆʹµÃStormÒì³£µÄÎȶ¨¡£

1¡¢Topologies

Ò»¸ötopologyÊÇspoutsºÍbolts×é³ÉµÄͼ£¬ ͨ¹ýstream groupings½«Í¼ÖеÄspoutsºÍboltsÁ¬½ÓÆðÀ´£¬ÈçÏÂͼ£º 

Ò»¸ötopology»áÒ»Ö±ÔËÐÐÖ±µ½ÄãÊÖ¶¯killµô£¬Storm×Ô¶¯ÖØзÖÅäÖ´ÐÐʧ°ÜµÄÈÎÎñ£¬ ²¢ÇÒStorm¿ÉÒÔ±£Ö¤Äã²»»áÓÐÊý¾Ý¶ªÊ§£¨Èç¹û¿ªÆôÁ˸߿ɿ¿ÐԵĻ°£©¡£Èç¹ûһЩ»úÆ÷ÒâÍâÍ£»úËüÉÏÃæµÄËùÓÐÈÎÎñ»á±»×ªÒƵ½ÆäËû»úÆ÷ÉÏ¡£

ÔËÐÐÒ»¸ötopologyºÜ¼òµ¥¡£Ê×ÏÈ£¬°ÑÄãËùÓеĴúÂëÒÔ¼°ËùÒÀÀµµÄjar´ò½øÒ»¸öjar°ü¡£È»ºóÔËÐÐÀàËÆÏÂÃæµÄÕâ¸öÃüÁ

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

Õâ¸öÃüÁî»áÔËÐÐÖ÷Àà: backtype.strom.MyTopology, ²ÎÊýÊÇarg1, arg2¡£Õâ¸öÀàµÄmainº¯Êý¶¨ÒåÕâ¸ötopology²¢ÇÒ°ÑËüÌá½»¸øNimbus¡£storm jar¸ºÔðÁ¬½Óµ½Nimbus²¢ÇÒÉÏ´«jar°ü¡£

TopologyµÄ¶¨ÒåÊÇÒ»¸öThrift½á¹¹£¬²¢ÇÒNimbus¾ÍÊÇÒ»¸öThrift·þÎñ£¬ Äã¿ÉÒÔÌá½»ÓÉÈκÎÓïÑÔ´´½¨µÄtopology¡£ÉÏÃæµÄ·½ÃæÊÇÓÃJVM-basedÓïÑÔÌá½»µÄ×î¼òµ¥µÄ·½·¨¡£

2¡¢Streams

ÏûÏ¢Á÷streamÊÇstormÀïµÄ¹Ø¼ü³éÏó¡£Ò»¸öÏûÏ¢Á÷ÊÇÒ»¸öûÓб߽çµÄtupleÐòÁУ¬ ¶øÕâЩtupleÐòÁлáÒÔÒ»ÖÖ·Ö²¼Ê½µÄ·½Ê½²¢Ðеش´½¨ºÍ´¦Àí¡£Í¨¹ý¶ÔstreamÖÐtupleÐòÁÐÖÐÿ¸ö×Ö¶ÎÃüÃûÀ´¶¨Òåstream¡£ÔÚĬÈϵÄÇé¿öÏ£¬tupleµÄ×Ö¶ÎÀàÐÍ¿ÉÒÔÊÇ£ºinteger£¬long£¬short£¬ byte£¬string£¬double£¬float£¬booleanºÍbyte array¡£ÄãÒ²¿ÉÒÔ×Ô¶¨ÒåÀàÐÍ£¨Ö»ÒªÊµÏÖÏàÓ¦µÄÐòÁл¯Æ÷£©¡£

ÿ¸öÏûÏ¢Á÷ÔÚ¶¨ÒåµÄʱºò»á±»·ÖÅä¸øÒ»¸öid£¬ÒòΪµ¥ÏòÏûÏ¢Á÷ʹÓõÄÏ൱Æձ飬 OutputFieldsDeclarer¶¨ÒåÁËһЩ·½·¨ÈÃÄã¿ÉÒÔ¶¨ÒåÒ»¸östream¶ø²»ÓÃÖ¸¶¨Õâ¸öid¡£ÔÚÕâÖÖÇé¿öÏÂÕâ¸östream»á·ÖÅä¸öֵΪ‘default’ĬÈϵÄid ¡£

StormÌṩµÄ×î»ù±¾µÄ´¦ÀístreamµÄÔ­ÓïÊÇspoutºÍbolt¡£Äã¿ÉÒÔʵÏÖspoutºÍboltÌṩµÄ½Ó¿ÚÀ´´¦ÀíÄãµÄÒµÎñÂß¼­¡£

3¡¢Spouts

ÏûÏ¢Ô´spoutÊÇStormÀïÃæÒ»¸ötopologyÀïÃæµÄÏûÏ¢Éú²úÕß¡£Ò»°ãÀ´ËµÏûÏ¢Ô´»á´ÓÒ»¸öÍⲿԴ¶ÁÈ¡Êý¾Ý²¢ÇÒÏòtopologyÀïÃæ·¢³öÏûÏ¢£ºtuple¡£Spout¿ÉÒÔÊÇ¿É¿¿µÄÒ²¿ÉÒÔÊDz»¿É¿¿µÄ¡£Èç¹ûÕâ¸ötupleûÓб»storm³É¹¦´¦Àí£¬¿É¿¿µÄÏûÏ¢Ô´spouts¿ÉÒÔÖØз¢ÉäÒ»¸ötuple£¬ µ«ÊDz»¿É¿¿µÄÏûÏ¢Ô´spoutsÒ»µ©·¢³öÒ»¸ötuple¾Í²»ÄÜÖØ·¢ÁË¡£

ÏûÏ¢Ô´¿ÉÒÔ·¢Éä¶àÌõÏûÏ¢Á÷stream¡£Ê¹ÓÃOutputFieldsDeclarer.declareStreamÀ´¶¨Òå¶à¸östream£¬È»ºóʹÓÃSpoutOutputCollectorÀ´·¢ÉäÖ¸¶¨µÄstream¡£

SpoutÀàÀïÃæ×îÖØÒªµÄ·½·¨ÊÇnextTuple¡£ÒªÃ´·¢ÉäÒ»¸öеÄtupleµ½topologyÀïÃæ»òÕß¼òµ¥µÄ·µ»ØÈç¹ûÒѾ­Ã»ÓÐеÄtuple¡£Òª×¢ÒâµÄÊÇnextTuple·½·¨²»ÄÜ×èÈû£¬ÒòΪstormÔÚͬһ¸öÏß³ÌÉÏÃæµ÷ÓÃËùÓÐÏûÏ¢Ô´spoutµÄ·½·¨¡£

ÁíÍâÁ½¸ö±È½ÏÖØÒªµÄspout·½·¨ÊÇackºÍfail¡£stormÔÚ¼ì²âµ½Ò»¸ötuple±»Õû¸ötopology³É¹¦´¦ÀíµÄʱºòµ÷ÓÃack£¬·ñÔòµ÷ÓÃfail¡£stormÖ»¶Ô¿É¿¿µÄspoutµ÷ÓÃackºÍfail¡£

4¡¢Bolts

ËùÓеÄÏûÏ¢´¦ÀíÂß¼­±»·â×°ÔÚboltsÀïÃæ¡£Bolts¿ÉÒÔ×öºÜ¶àÊÂÇ飺¹ýÂË£¬¾ÛºÏ£¬²éѯÊý¾Ý¿âµÈµÈ¡£

Bolts¿ÉÒÔ¼òµ¥µÄ×öÏûÏ¢Á÷µÄ´«µÝ¡£¸´ÔÓµÄÏûÏ¢Á÷´¦ÀíÍùÍùÐèÒªºÜ¶à²½Ö裬´Ó¶øÒ²¾ÍÐèÒª¾­¹ýºÜ¶àbolts¡£±ÈÈçËã³öÒ»¶ÑͼƬÀïÃ汻ת·¢×î¶àµÄͼƬ¾ÍÖÁÉÙÐèÒªÁ½²½£ºµÚÒ»²½Ëã³öÿ¸öͼƬµÄת·¢ÊýÁ¿¡£µÚ¶þ²½ÕÒ³öת·¢×î¶àµÄÇ°10¸öͼƬ¡££¨Èç¹ûÒª°ÑÕâ¸ö¹ý³Ì×öµÃ¸ü¾ßÓÐÀ©Õ¹ÐÔÄÇô¿ÉÄÜÐèÒª¸ü¶àµÄ²½Ö裩¡£

Bolts¿ÉÒÔ·¢Éä¶àÌõÏûÏ¢Á÷£¬ ʹÓÃOutputFieldsDeclarer.declareStream¶¨Òåstream£¬Ê¹ÓÃOutputCollector.emitÀ´Ñ¡ÔñÒª·¢ÉäµÄstream¡£

BoltsµÄÖ÷Òª·½·¨ÊÇexecute, ËüÒÔÒ»¸ötuple×÷ΪÊäÈ룬boltsʹÓÃOutputCollectorÀ´·¢Éätuple£¬bolts±ØÐëҪΪËü´¦ÀíµÄÿһ¸ötupleµ÷ÓÃOutputCollectorµÄack·½·¨£¬ÒÔ֪ͨStormÕâ¸ötuple±»´¦ÀíÍê³ÉÁË£¬´Ó¶ø֪ͨÕâ¸ötupleµÄ·¢ÉäÕßspouts¡£ Ò»°ãµÄÁ÷³ÌÊÇ£º bolts´¦ÀíÒ»¸öÊäÈëtuple,  ·¢Éä0¸ö»òÕ߶à¸ötuple, È»ºóµ÷ÓÃack֪ͨstorm×Ô¼ºÒѾ­´¦Àí¹ýÕâ¸ötupleÁË¡£stormÌṩÁËÒ»¸öIBasicBolt»á×Ô¶¯µ÷ÓÃack¡£

5¡¢Stream groupings

¶¨ÒåÒ»¸ötopologyµÄÆäÖÐÒ»²½ÊǶ¨Òåÿ¸öbolt½ÓÊÕʲôÑùµÄÁ÷×÷ΪÊäÈë¡£stream grouping¾ÍÊÇÓÃÀ´¶¨ÒåÒ»¸östreamÓ¦¸ÃÈç¹û·ÖÅäÊý¾Ý¸øboltsÉÏÃæµÄ¶à¸ötasks¡£

StormÀïÃæÓÐ7ÖÖÀàÐ͵Ästream grouping

  1. Shuffle Grouping: Ëæ»ú·Ö×飬 Ëæ»úÅÉ·¢streamÀïÃæµÄtuple£¬±£Ö¤Ã¿¸öbolt½ÓÊÕµ½µÄtupleÊýÄ¿´óÖÂÏàͬ¡£
  2. Fields Grouping£º°´×ֶηÖ×飬 ±ÈÈç°´useridÀ´·Ö×飬 ¾ßÓÐͬÑùuseridµÄtuple»á±»·Öµ½ÏàͬµÄBoltsÀïµÄÒ»¸ötask£¬ ¶ø²»Í¬µÄuseridÔò»á±»·ÖÅäµ½²»Í¬µÄboltsÀïµÄtask¡£
  3. All Grouping£º¹ã²¥·¢ËÍ£¬¶ÔÓÚÿһ¸ötuple£¬ËùÓеÄbolts¶¼»áÊÕµ½¡£
  4.  Global Grouping£ºÈ«¾Ö·Ö×飬 Õâ¸ötuple±»·ÖÅäµ½stormÖеÄÒ»¸öboltµÄÆäÖÐÒ»¸ötask¡£ÔÙ¾ßÌåÒ»µã¾ÍÊÇ·ÖÅä¸øidÖµ×îµÍµÄÄǸötask¡£
  5. Non Grouping£º²»·Ö×飬Õâ¸ö·Ö×éµÄÒâ˼ÊÇ˵stream²»¹ØÐĵ½µ×Ë­»áÊÕµ½ËüµÄtuple¡£Ä¿Ç°ÕâÖÖ·Ö×éºÍShuffle groupingÊÇÒ»ÑùµÄЧ¹û£¬ ÓÐÒ»µã²»Í¬µÄÊÇstorm»á°ÑÕâ¸öbolt·Åµ½Õâ¸öboltµÄ¶©ÔÄÕßͬһ¸öÏß³ÌÀïÃæÈ¥Ö´ÐС£
  6. Direct Grouping£º Ö±½Ó·Ö×飬 ÕâÊÇÒ»ÖֱȽÏÌرðµÄ·Ö×é·½·¨£¬ÓÃÕâÖÖ·Ö×éÒâζ×ÅÏûÏ¢µÄ·¢ËÍÕßÖ¸¶¨ÓÉÏûÏ¢½ÓÊÕÕßµÄÄĸötask´¦ÀíÕâ¸öÏûÏ¢¡£ Ö»Óб»ÉùÃ÷ΪDirect StreamµÄÏûÏ¢Á÷¿ÉÒÔÉùÃ÷ÕâÖÖ·Ö×é·½·¨¡£¶øÇÒÕâÖÖÏûÏ¢tuple±ØÐëʹÓÃemitDirect·½·¨À´·¢Éä¡£ÏûÏ¢´¦ÀíÕß¿ÉÒÔͨ¹ýTopologyContextÀ´»ñÈ¡´¦ÀíËüµÄÏûÏ¢µÄtaskµÄid £¨OutputCollector.emit·½·¨Ò²»á·µ»ØtaskµÄid£©¡£
  7. Local or shuffle grouping£ºÈç¹ûÄ¿±êboltÓÐÒ»¸ö»òÕ߶à¸ötaskÔÚͬһ¸ö¹¤×÷½ø³ÌÖУ¬tuple½«»á±»Ëæ»ú·¢Éú¸øÕâЩtasks¡£·ñÔò£¬ºÍÆÕͨµÄShuffle GroupingÐÐΪһÖ¡£

6¡¢Reliability

Storm±£Ö¤Ã¿¸ötuple»á±»topologyÍêÕûµÄÖ´ÐС£Storm»á×·×ÙÓÉÿ¸öspout tupleËù²úÉúµÄtupleÊ÷£¨Ò»¸öbolt´¦ÀíÒ»¸ötupleÖ®ºó¿ÉÄܻᷢÉä±ðµÄtuple´Ó¶øÐγÉÊ÷×´½á¹¹£©£¬²¢ÇÒ¸ú×ÙÕâ¿ÃtupleÊ÷ʲôʱºò³É¹¦´¦ÀíÍꡣÿ¸ötopology¶¼ÓÐÒ»¸öÏûÏ¢³¬Ê±µÄÉèÖã¬Èç¹ûstormÔÚÕâ¸ö³¬Ê±µÄʱ¼äÄÚ¼ì²â²»µ½Ä³¸ötupleÊ÷µ½µ×ÓÐûÓÐÖ´Ðгɹ¦£¬ ÄÇôtopology»á°ÑÕâ¸ötuple±ê¼ÇΪִÐÐʧ°Ü£¬²¢ÇÒ¹ýÒ»»á¶ùÖØз¢ÉäÕâ¸ötuple¡£

ΪÁËÀûÓÃStormµÄ¿É¿¿ÐÔÌØÐÔ£¬ÔÚÄã·¢³öÒ»¸öеÄtupleÒÔ¼°ÄãÍê³É´¦ÀíÒ»¸ötupleµÄʱºòÄã±ØÐëҪ֪ͨstorm¡£ÕâÒ»ÇÐÊÇÓÉOutputCollectorÀ´Íê³ÉµÄ¡£Í¨¹ýemit·½·¨À´Í¨ÖªÒ»¸öеÄtuple²úÉúÁË£¬Í¨¹ýack·½·¨Í¨ÖªÒ»¸ötuple´¦ÀíÍê³ÉÁË¡£

StormµÄ¿É¿¿ÐÔÎÒÃÇÔÚµÚËÄÕ»áÉîÈë½éÉÜ¡£

7¡¢Tasks

ÿһ¸öspoutºÍbolt»á±»µ±×÷ºÜ¶àtaskÔÚÕû¸ö¼¯ÈºÀïÖ´ÐС£Ã¿Ò»¸öexecutor¶ÔÓ¦µ½Ò»¸öỊ̈߳¬ÔÚÕâ¸öÏß³ÌÉÏÔËÐжà¸ötask£¬¶østream groupingÔòÊǶ¨ÒåÔõô´ÓÒ»¶Ñtask·¢Éätupleµ½ÁíÍâÒ»¶Ñtask¡£Äã¿ÉÒÔµ÷ÓÃTopologyBuilderÀàµÄsetSpoutºÍsetBoltÀ´ÉèÖò¢Ðжȣ¨Ò²¾ÍÊÇÓжàÉÙ¸ötask£©¡£

8¡¢Workers

Ò»¸ötopology¿ÉÄÜ»áÔÚÒ»¸ö»òÕ߶à¸öworker£¨¹¤×÷½ø³Ì£©ÀïÃæÖ´ÐУ¬Ã¿¸öworkerÊÇÒ»¸öÎïÀíJVM²¢ÇÒÖ´ÐÐÕû¸ötopologyµÄÒ»²¿·Ö¡£±ÈÈ磬¶ÔÓÚ²¢ÐжÈÊÇ300µÄtopologyÀ´Ëµ£¬Èç¹ûÎÒÃÇʹÓÃ50¸ö¹¤×÷½ø³ÌÀ´Ö´ÐУ¬ÄÇôÿ¸ö¹¤×÷½ø³Ì»á´¦ÀíÆäÖеÄ6¸ötasks¡£Storm»á¾¡Á¿¾ùÔȵŤ×÷·ÖÅä¸øËùÓеÄworker¡£

9¡¢Configuration

StormÀïÃæÓÐÒ»¶Ñ²ÎÊý¿ÉÒÔÅäÖÃÀ´µ÷ÕûNimbus, SupervisorÒÔ¼°ÕýÔÚÔËÐеÄtopologyµÄÐÐΪ£¬Ò»Ð©ÅäÖÃÊÇϵͳ¼¶±ðµÄ£¬Ò»Ð©ÅäÖÃÊÇtopology¼¶±ðµÄ¡£default.yamlÀïÃæÓÐËùÓеÄĬÈÏÅäÖá£Äã¿ÉÒÔͨ¹ý¶¨Òå¸östorm.yamlÔÚÄãµÄclasspathÀïÀ´¸²¸ÇÕâЩĬÈÏÅäÖᣲ¢ÇÒÄãÒ²¿ÉÒÔÔÚ´úÂëÀïÃæÉèÖÃһЩtopologyÏà¹ØµÄÅäÖÃÐÅÏ¢£¨Ê¹ÓÃStormSubmitter£©¡£

 

Storm¼¯ÈººÍHadoop¼¯Èº±íÃæÉÏ¿´ºÜÀàËÆ¡£µ«ÊÇHadoopÉÏÔËÐеÄÊÇMapReduce jobs£¬¶øÔÚStormÉÏÔËÐеÄÊÇÍØÆË£¨topology£©£¬ÕâÁ½ÕßÖ®¼äÊǷdz£²»Ò»ÑùµÄ¡£TopologyµÄ¶¨ÒåÊÇÒ»¸öThrift½á¹¹£¬²¢ÇÒNimbus¾ÍÊÇÒ»¸öThrift·þÎñ£¬ Äã¿ÉÒÔÌá½»ÓÉÈκÎÓïÑÔ´´½¨µÄtopology¡£

 

 

¶þ¡¢¹¹½¨Topology

1. ʵÏÖµÄÄ¿±ê£º

ÎÒÃǽ«Éè¼ÆÒ»¸ötopology£¬À´ÊµÏÖ¶ÔÒ»¸ö¾ä×ÓÀïÃæµÄµ¥´Ê³öÏÖµÄƵÂʽøÐÐͳ¼Æ¡£ÕâÊÇÒ»¸ö¼òµ¥µÄÀý×Ó£¬Ä¿µÄÊÇÈôó¼Ò¶ÔÓÚtopology¿ìËÙÉÏÊÖ£¬ÓÐÒ»¸ö³õ²½µÄÀí½â¡£

2. Éè¼ÆTopology½á¹¹£º

ÔÚ¿ªÊ¼¿ª·¢StormÏîÄ¿µÄµÚÒ»²½£¬¾ÍÊÇÒªÉè¼Ætopology¡£È·¶¨ºÃÄãµÄÊý¾Ý´¦ÀíÂß¼­£¬ÎÒÃǽñÌ콫µÄÕâ¸ö¼òµ¥µÄÀý×Ó£¬topologyÒ²·Ç³£¼òµ¥¡£Õû¸ötopologyÈçÏ£º

 

Õû¸ötopology·ÖΪÈý¸ö²¿·Ö£º

  • KestrelSpout:Êý¾ÝÔ´£¬¸ºÔð·¢ËÍsentence
  • Splitsentence:¸ºÔð½«sentenceÇзÖ
  • Wordcount:¸ºÔð¶Ôµ¥´ÊµÄƵÂʽøÐÐÀÛ¼Ó

3. Éè¼ÆÊý¾ÝÁ÷

Õâ¸ötopology´Ókestrel queue¶ÁÈ¡¾ä×Ó,²¢°Ñ¾ä×Ó»®·Ö³Éµ¥´Ê,È»ºó»ã×Üÿ¸öµ¥´Ê³öÏֵĴÎÊý,Ò»¸ötuple¸ºÔð¶ÁÈ¡¾ä×Ó,ÿһ¸ötuple·Ö±ð¶ÔÓ¦¼ÆËãÿһ¸öµ¥´Ê³öÏֵĴÎÊý,´ó¸ÅÑù×ÓÈçÏÂËùʾ£º

 

4. ´úÂëʵÏÖ£º

1) ¹¹½¨maven»·¾³£º

ΪÁË¿ª·¢storm topology, ÄãÐèÒª°ÑstormÏà¹ØµÄjar°üÌí¼Óµ½classpathÀïÃæÈ¥£º ҪôÊÖ¶¯Ìí¼ÓËùÓÐÏà¹ØµÄjar°ü£¬ ҪôʹÓÃmavenÀ´¹ÜÀíËùÓеÄÒÀÀµ¡£stormµÄjar°ü·¢²¼ÔÚClojars(Ò»¸ömaven¿â), Èç¹ûÄãʹÓÃmavenµÄ»°£¬°ÑÏÂÃæµÄÅäÖÃÌí¼ÓÔÚÄãÏîÄ¿µÄpom.xmlÀïÃæ¡£


		
  1. <repository> 
  2.     <id>clojars.org</id> 
  3.     <url>http://clojars.org/repo</url> 
  4. </repository> 
  5. <dependency> 
  6.      <groupId>storm</groupId> 
  7.     <artifactId>storm</artifactId> 
  8.      <version>0.5.3</version> 
  9.      <scope>test</scope> 
  10. </dependency> 

2) ¶¨Òåtopology£º


		
  1. TopologyBuilder builder = new TopologyBuilder(); 
  2. builder.setSpout(1, new KestrelSpout(“kestrel.backtype.com”,22133,                                                                                   ”sentence_queue”,                                                                                 new StringScheme())); 
  3. builder.setBolt(2, new SplitSentence(), 10) 
  4. .shuffleGrouping(1); 
  5. builder.setBolt(3, new WordCount(), 20) 
  6. .fieldsGrouping(2, new Fields(“word”)); 

ÕâÖÖtopologyµÄspout´Ó¾ä×Ó¶ÓÁÐÖжÁÈ¡¾ä×Ó£¬ÔÚkestrel.backtype.comλÓÚÒ»¸öKestrelµÄ·þÎñÆ÷¶Ë¿Ú22133¡£

SpoutÓÃsetSpout·½·¨²åÈëÒ»¸ö¶ÀÌصÄidµ½topology¡£ TopologyÖеÄÿ¸ö½Úµã±ØÐë¸øÓèÒ»¸öid£¬idÊÇÓÉÆäËûboltsÓÃÓÚ¶©ÔĸýڵãµÄÊä³öÁ÷¡£ KestrelSpoutÔÚtopologyÖÐidΪ1¡£

setBoltÊÇÓÃÓÚÔÚTopologyÖвåÈëbolts¡£ ÔÚtopologyÖж¨ÒåµÄµÚÒ»¸öbolts ÊÇÇиî¾ä×ÓµÄbolts¡£ Õâ¸öbolts ½«¾ä×ÓÁ÷ת³É³Éµ¥´ÊÁ÷¡£

ÈÃÎÒÃÇ¿´¿´SplitSentenceʵʩ£º


		
  1. public class SplitSentence implements IBasicBolt{ 
  2.         public void prepare(Map conf, TopologyContext context) { 
  3.          } 
  4.        public void execute(Tuple tuple, BasicOutputCollector collector) { 
  5.               String sentence = tuple.getString(0); 
  6.                for(String word: sentence.split(“ ”)) { 
  7.                         collector.emit(new Values(word)); 
  8.                   } 
  9.              } 
  10.          public void cleanup() { 
  11.         } 
  12.         public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  13.                 declarer.declare(new Fields(“word”)); 
  14.              } 
  15.  } 

¹Ø¼üµÄ·½·¨ÊÇ execute·½·¨¡£ ÕýÈçÄã¿ÉÒÔ¿´µ½£¬Ëü½«¾ä×Ó²ð·Ö³Éµ¥´Ê£¬²¢·¢³öÿ¸öµ¥´Ê×÷Ϊһ¸öеÄÔª×é¡£ ÁíÒ»¸öÖØÒªµÄ·½·¨ÊÇdeclareOutputFields£¬ÆäÖÐÐû²¼boltsÊä³öÔª×éµÄ¼Ü¹¹¡£ ÔÚÕâÀïÐû²¼£¬Ëü·¢³öÒ»¸öÓòΪwordµÄÔª×é¡£

setBoltµÄ×îºóÒ»¸ö²ÎÊýÊÇÄãÏëΪboltsµÄ²¢ÐÐÁ¿¡£ SplitSentence bolts ÊÇ10¸ö²¢·¢£¬Õ⽫µ¼ÖÂÔÚstorm¼¯ÈºÖÐÓÐÊ®¸öÏ̲߳¢ÐÐÖ´ÐС£ ÄãËùÒª×öµÄµÄÊÇÔö¼ÓboltsµÄ²¢ÐÐÁ¿ÔÚÓöµ½topologyµÄÆ¿¾±Ê±¡£

setBolt·½·¨·µ»ØÒ»¸ö¶ÔÏó£¬ÓÃÀ´¶¨ÒåboltsµÄÊäÈë¡£ ÀýÈ磬SplitSentenceÂÝ˨¶©ÔÄ×é¼þ“1”ʹÓÃËæ»ú·Ö×éµÄÊä³öÁ÷¡£ “1”ÊÇÖ¸ÒѾ­¶¨ÒåKestrelSpout¡£ ÎÒ½«½âÊÍÔÚijһʱ¿ÌµÄËæ»ú·Ö×éµÄÒ»²¿·Ö¡£ µ½Ä¿Ç°ÎªÖ¹£¬×îÒª½ôµÄÊÇ£¬SplitSentence bolts»áÏûºÄKestrelSpout·¢³öµÄÿһ¸öÔª×é¡£

ÏÂÃæÔÚÈÃÎÒÃÇ¿´¿´wordcountµÄʵÏÖ£º


		
  1. public class WordCount implements IBasicBolt { 
  2.         private Map<String, Integer> _counts = new HashMap<String, Integer>(); 
  3.         public void prepare(Map conf, TopologyContext context) { 
  4.         } 
  5.        public void execute(Tuple tuple, BasicOutputCollector collector) { 
  6.               String word = tuple.getString(0); 
  7.               int count; 
  8.               if(_counts.containsKey(word)) { 
  9.                      count = _counts.get(word); 
  10.               } else { 
  11.                     count = 0; 
  12.               count++; 
  13.               _counts.put(word, count); 
  14.               collector.emit(new Values(word, count)); 
  15.        } 
  16.        public void cleanup() { 
  17.        } 
  18.        public void declareOutputFields(OutputFieldsDeclarer declarer) { 
  19.               declarer.declare(new Fields(“word”, “count”)); 
  20.        } 

SplitSentence¶ÔÓÚ¾ä×ÓÀïÃæµÄÿ¸öµ¥´Ê·¢ÉäÒ»¸öеÄtuple, WordCountÔÚÄÚ´æÀïÃæά»¤Ò»¸öµ¥´Ê->´ÎÊýµÄmapping£¬ WordCountÿÊÕµ½Ò»¸öµ¥´Ê£¬ Ëü¾Í¸üÐÂÄÚ´æÀïÃæµÄͳ¼Æ״̬¡£

5. ÔËÐÐTopology

stormµÄÔËÐÐÓÐÁ½ÖÖģʽ: ±¾µØģʽºÍ·Ö²¼Ê½Ä£Ê½.

1) ±¾µØģʽ£º

stormÓÃÒ»¸ö½ø³ÌÀïÃæµÄÏß³ÌÀ´Ä£ÄâËùÓеÄspoutºÍbolt. ±¾µØģʽ¶Ô¿ª·¢ºÍ²âÊÔÀ´Ëµ±È½ÏÓÐÓᣠÄãÔËÐÐstorm-starterÀïÃæµÄtopologyµÄʱºòËüÃǾÍÊÇÒÔ±¾µØģʽÔËÐеģ¬ Äã¿ÉÒÔ¿´µ½topologyÀïÃæµÄÿһ¸ö×é¼þÔÚ·¢ÉäʲôÏûÏ¢¡£

2) ·Ö²¼Ê½Ä£Ê½£º

stormÓÉÒ»¶Ñ»úÆ÷×é³É¡£µ±ÄãÌá½»topology¸ømasterµÄʱºò£¬ ÄãͬʱҲ°ÑtopologyµÄ´úÂëÌá½»ÁË¡£master¸ºÔð·Ö·¢ÄãµÄ´úÂë²¢ÇÒ¸ºÔð¸øÄãµÄtopolgoy·ÖÅ乤×÷½ø³Ì¡£Èç¹ûÒ»¸ö¹¤×÷½ø³Ì¹ÒµôÁË£¬ master½Úµã»á°ÑÈÏΪÖØзÖÅäµ½ÆäËü½Úµã¡£

3) ÏÂÃæÊÇÒÔ±¾µØģʽÔËÐеĴúÂ룺


		
  1. Config conf = new Config(); 
  2. conf.setDebug(true); 
  3. conf.setNumWorkers(2); 
  4. LocalCluster cluster = new LocalCluster(); 
  5. cluster.submitTopology(“test”, conf, builder.createTopology()); 
  6. Utils.sleep(10000); 
  7. cluster.killTopology(“test”); 
  8. cluster.shutdown(); 

Ê×ÏÈ£¬ Õâ¸ö´úÂ붨Òåͨ¹ý¶¨ÒåÒ»¸öLocalCluster¶ÔÏóÀ´¶¨ÒåÒ»¸ö½ø³ÌÄڵļ¯Èº¡£Ìá½»topology¸øÕâ¸öÐéÄâµÄ¼¯ÈººÍÌá½»topology¸ø·Ö²¼Ê½¼¯ÈºÊÇÒ»ÑùµÄ¡£Í¨¹ýµ÷ÓÃsubmitTopology·½·¨À´Ìá½»topology£¬ Ëü½ÓÊÜÈý¸ö²ÎÊý£ºÒªÔËÐеÄtopologyµÄÃû×Ö£¬Ò»¸öÅäÖöÔÏóÒÔ¼°ÒªÔËÐеÄtopology±¾Éí¡£

topologyµÄÃû×ÖÊÇÓÃÀ´Î¨Ò»Çø±ðÒ»¸ötopologyµÄ£¬ÕâÑùÄãÈ»ºó¿ÉÒÔÓÃÕâ¸öÃû×ÖÀ´É±ËÀÕâ¸ötopologyµÄ¡£Ç°ÃæÒѾ­Ëµ¹ýÁË£¬ Äã±ØÐëÏÔʽµÄɱµôÒ»¸ötopology£¬ ·ñÔòËü»áÒ»Ö±ÔËÐС£

Conf¶ÔÏó¿ÉÒÔÅäÖúܶණÎ÷£¬ ÏÂÃæÁ½¸öÊÇ×î³£¼ûµÄ£º

TOPOLOGY_WORKERS(setNumWorkers) ¶¨ÒåÄãÏ£Íû¼¯Èº·ÖÅä¶àÉÙ¸ö¹¤×÷½ø³Ì¸øÄãÀ´Ö´ÐÐÕâ¸ötopology. topologyÀïÃæµÄÿ¸ö×é¼þ»á±»ÐèÒªÏß³ÌÀ´Ö´ÐС£Ã¿¸ö×é¼þµ½µ×ÓöàÉÙ¸öÏß³ÌÊÇͨ¹ýsetBoltºÍsetSpoutÀ´Ö¸¶¨µÄ¡£ÕâЩÏ̶߳¼ÔËÐÐÔÚ¹¤×÷½ø³ÌÀïÃæ. ÿһ¸ö¹¤×÷½ø³Ì°üº¬Ò»Ð©½ÚµãµÄһЩ¹¤×÷Ï̡߳£±ÈÈ磬 Èç¹ûÄãÖ¸¶¨300¸öỊ̈߳¬60¸ö½ø³Ì£¬ ÄÇôÿ¸ö¹¤×÷½ø³ÌÀïÃæÒªÖ´ÐÐ6¸öỊ̈߳¬ ¶øÕâ6¸öÏ߳̿ÉÄÜÊôÓÚ²»Í¬µÄ×é¼þ(Spout, Bolt)¡£Äã¿ÉÒÔͨ¹ýµ÷Õûÿ¸ö×é¼þµÄ²¢ÐжÈÒÔ¼°ÕâЩÏß³ÌËùÔڵĽø³ÌÊýÁ¿À´µ÷ÕûtopologyµÄÐÔÄÜ¡£

TOPOLOGY_DEBUG(setDebug), µ±Ëü±»ÉèÖóÉtrueµÄ»°£¬ storm»á¼Ç¼ÏÂÿ¸ö×é¼þËù·¢ÉäµÄÿÌõÏûÏ¢¡£ÕâÔÚ±¾µØ»·¾³µ÷ÊÔtopologyºÜÓÐÓ㬠µ«ÊÇÔÚÏßÉÏÕâô×öµÄ»°»áÓ°ÏìÐÔÄܵġ£

½áÂÛ£º

±¾Õ´ÓstormµÄ»ù±¾¶ÔÏóµÄ¶¨Ò壬µ½¹ã·ºµÄ½éÉÜÁËstormµÄ¿ª·¢»·¾³£¬´ÓÒ»¸ö¼òµ¥µÄÀý×Ó½²½âÁËtopologyµÄ¹¹½¨ºÍ¶¨Ò塣ϣÍû´ó¼Ò¿ÉÒÔ´Ó±¾ÕµÄÄÚÈݶÔstormÓÐÒ»¸ö»ù±¾µÄÀí½âºÍ¸ÅÄ²¢ÇÒÒѾ­¿ÉÒÔ¹¹½¨Ò»¸ö¼òµ¥µÄtopology£¡£¡

 

(ÔðÈα༭£ºIT)
------·Ö¸ôÏß----------------------------