Twitter 上的 Apache Storm


在本章中,我们将讨论 Apache Storm 的实时应用程序。我们将看到 Storm 在 Twitter 中是如何使用的。

Twitter


Twitter 是一种在线社交网络服务,它提供了一个发送和接收用户推文的平台。注册用户可以阅读和发布推文,但未注册用户只能阅读推文。 Hashtag 用于通过在相关关键字前附加 # 来按关键字对推文进行分类。现在让我们采取一个实时场景来查找每个主题最常用的主题标签。

喷口创建

spout 的目的是尽快获取人们提交的推文。 Twitter 提供“Twitter Streaming API”,这是一个基于 Web 服务的工具,用于实时检索人们提交的推文。 Twitter Streaming API 可以用任何编程语言访问。

推特4j 是一个开源的、非官方的 Java 库,它提供了一个基于 Java 的模块来轻松访问 Twitter 流 API。 推特4j 提供基于侦听器的框架来访问推文。要访问 Twitter 流 API,我们需要登录 Twitter 开发者帐户,并且应该获得以下 OAuth 身份验证详细信息。

  • 客户密钥
  • 客户秘密
  • 访问令牌
  • AccessTookenSecret

Storm 提供了一个 twitter spout, TwitterSampleSpout, 在其入门套件中。我们将使用它来检索推文。 spout 需要 OAuth 身份验证详细信息和至少一个关键字。 spout 将根据关键字发出实时推文。完整的程序代码如下。

编码:TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;

import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.auth.AccessToken;
import twitter4j.conf.ConfigurationBuilder;

import backtype.storm.Config;
import backtype.storm.spout.Spout输出Collector;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.输出FieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
    Spout输出Collector _collector;
    LinkedBlockingQueue<Status> queue = null;
    TwitterStream _twitterStream;
		
    String consumerKey;
    String consumerSecret;
    String accessToken;
    String accessTokenSecret;
    String[] keyWords;
		
    public TwitterSampleSpout(String consumerKey, String consumerSecret,
        String accessToken, String accessTokenSecret, String[] keyWords) {
            this.consumerKey = consumerKey;
            this.consumerSecret = consumerSecret;
            this.accessToken = accessToken;
            this.accessTokenSecret = accessTokenSecret;
            this.keyWords = keyWords;
    }
		
    public TwitterSampleSpout() {
        // TODO 自动生成的构造函数存根
    }
		
    @Override
    public void open(Map conf, TopologyContext context,
        Spout输出Collector collector) {
            queue = new LinkedBlockingQueue<Status>(1000);
            _collector = collector;
            StatusListener listener = new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    queue.offer(status);
                }
					
                @Override
                public void onDeletionNotice(StatusDeletionNotice sdn) {}
					
                @Override
                public void onTrackLimitationNotice(int i) {}
					
                @Override
                public void onScrubGeo(long l, long l1) {}
					
                @Override
                public void onException(Exception ex) {}
					
                @Override
                public void onStallWarning(StallWarning arg0) {
                    // TODO 自动生成的方法存根
                }
            };
				
            ConfigurationBuilder cb = new ConfigurationBuilder();
				
            cb.setDebugEnabled(true)
                .setOAuthConsumerKey(consumerKey)
                .setOAuthConsumerSecret(consumerSecret)
                .setOAuthAccessToken(accessToken)
                .setOAuthAccessTokenSecret(accessTokenSecret);
					
            _twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
            _twitterStream.addListener(listener);
				
            if (keyWords.length == 0) {
                _twitterStream.sample();
            }else {
                FilterQuery query = new FilterQuery().track(keyWords);
                _twitterStream.filter(query);
            }
    }
			
    @Override
    public void nextTuple() {
        Status ret = queue.poll();
				
        if (ret == null) {
            Utils.sleep(50);
        } else {
            _collector.emit(new Values(ret));
        }
    }
			
    @Override
    public void close() {
        _twitterStream.shutdown();
    }
			
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Config ret = new Config();
        ret.setMaxTaskParallelism(1);
        return ret;
    }
			
    @Override
    public void ack(Object id) {}
			
    @Override
    public void fail(Object id) {}
			
    @Override
    public void declare输出Fields(输出FieldsDeclarer declarer) {
        declarer.declare(new Fields("tweet"));
    }
}

标签阅读器螺栓


spout 发出的推文将被转发到 HashtagReaderBolt ,它将处理推文并发出所有可用的主题标签。 HashtagReaderBolt 使用 获取哈希标签实体 twitter4j 提供的方法。 getHashTagEntities 读取推文并返回主题标签列表。完整的程序代码如下:

编码:HashtagReaderBolt.java

import java.util.HashMap;
import java.util.Map;

import twitter4j.*;
import twitter4j.conf.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.输出Collector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.输出FieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagReaderBolt implements IRichBolt {
    private 输出Collector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, 输出Collector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        Status tweet = (Status) tuple.getValueByField("tweet");
        for(HashtagEntity hashtage : tweet.getHashtagEntities()) {
            System.out.println("Hashtag: " + hashtage.getText());
            this.collector.emit(new Values(hashtage.getText()));
        }
    }

    @Override
    public void cleanup() {}

    @Override
    public void declare输出Fields(输出FieldsDeclarer declarer) {
        declarer.declare(new Fields("hashtag"));
    }
	
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
	
}

Hashtag 计数器螺栓


发出的主题标签将被转发到 HashtagCounterBolt .此螺栓将处理所有主题标签并使用 Java Map 对象将每个主题标签及其计数保存在内存中。完整的程序代码如下。

编码:HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.输出Collector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.输出FieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
    Map<String, Integer> counterMap;
    private 输出Collector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, 输出Collector collector) {
        this.counterMap = new HashMap<String, Integer>();
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String key = tuple.getString(0);

        if(!counterMap.containsKey(key)){
            counterMap.put(key, 1);
        }else{
            Integer c = counterMap.get(key) + 1;
            counterMap.put(key, c);
        }
		
        collector.ack(tuple);
    }

    @Override
    public void cleanup() {
        for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
            System.out.println("Result: " + entry.getKey()+" : " + entry.getValue());
        }
    }

    @Override
    public void declare输出Fields(输出FieldsDeclarer declarer) {
        declarer.declare(new Fields("hashtag"));
    }
	
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
	
}

提交拓扑


提交拓扑是主要应用程序。 Twitter拓扑结构包括 TwitterSampleSpout , HashtagReaderBolt , and HashtagCounterBolt .以下程序代码显示了如何提交拓扑。

编码:TwitterHashtagStorm.java

import java.util.*;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TwitterHashtagStorm {
    public static void main(String[] args) throws Exception{
        String consumerKey = args[0];
        String consumerSecret = args[1];
		
        String accessToken = args[2];
        String accessTokenSecret = args[3];
		
        String[] arguments = args.clone();
        String[] keyWords = Arrays.copyOfRange(arguments, 4, arguments.length);
		
        Config config = new Config();
        config.setDebug(true);
		
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
            consumerSecret, accessToken, accessTokenSecret, keyWords));

        builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
            .shuffleGrouping("twitter-spout");

        builder.setBolt("twitter-hashtag-counter-bolt", new HashtagCounterBolt())
            .fieldsGrouping("twitter-hashtag-reader-bolt", new Fields("hashtag"));
			
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("TwitterHashtagStorm", config,
            builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}

构建和运行应用程序


完整的应用程序有四个 Java 代码。它们如下:

  • TwitterSampleSpout.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

你可以使用以下命令编译应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*” *.java

使用以下命令执行应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

该应用程序将打印当前可用的主题标签及其计数。输出应该类似于以下内容:

Result: jazztastic : 1
Result: foodie : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: livemusic : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: cooking : 1
Result: gameinsight : 1
Result: Countryfile : 1
Result: androidgames : 1