Yahoo! 中的 Apache Storm金融


雅虎! Finance 是 Internet 上领先的商业新闻和金融数据网站。它是雅虎的一部分!并提供有关金融新闻、市场统计数据、国际市场数据和其他任何人都可以访问的金融资源的信息。

如果你是已注册的 Yahoo!用户,然后你可以自定义 Yahoo!财务利用其某些产品。雅虎! Finance API 用于从 Yahoo! 查询财务数据

此 API 显示比实时延迟 15 分钟的数据,并每 1 分钟更新一次其数据库,以访问当前的股票相关信息。现在让我们以一家公司的实时场景为例,看看当其股票价值低于 100 时如何发出警报。

喷口创建


spout 的目的是获取公司的详细信息并将价格发送到螺栓。你可以使用以下程序代码来创建一个 spout。

编码:YahooFinanceSpout.java

import java.util.*;
import java.io.*;
import java.math.BigDecimal;

// 导入 yahoofinace 包
import yahoofinance.YahooFinance;
import yahoofinance.Stock;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.输出FieldsDeclarer;

import backtype.storm.spout.Spout输出Collector;
import backtype.storm.task.TopologyContext;

public class YahooFinanceSpout implements IRichSpout {
    private Spout输出Collector collector;
    private boolean completed = false;
    private TopologyContext context;
	
    @Override
    public void open(Map conf, TopologyContext context, Spout输出Collector collector){
        this.context = context;
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        try {
            Stock stock = YahooFinance.get("INTC");
            BigDecimal price = stock.getQuote().getPrice();

            this.collector.emit(new Values("INTC", price.doubleValue()));
            stock = YahooFinance.get("GOOGL");
            price = stock.getQuote().getPrice();

            this.collector.emit(new Values("GOOGL", price.doubleValue()));
            stock = YahooFinance.get("AAPL");
            price = stock.getQuote().getPrice();

            this.collector.emit(new Values("AAPL", price.doubleValue()));
        } catch(Exception e) {}
    }

    @Override
    public void declare输出Fields(输出FieldsDeclarer declarer) {
        declarer.declare(new Fields("company", "price"));
    }

    @Override
    public void close() {}
	
    public boolean isDistributed() {
        return false;
    }

    @Override
    public void activate() {}

    @Override
    public void deactivate() {}

    @Override
    public void ack(Object msgId) {}

    @Override
    public void fail(Object msgId) {}

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
	
}

螺栓创建


在这里,bolt 的目的是在价格低于 100 时处理给定公司的价格。它使用 Java Map 对象将截止价格限制警报设置为 true 当股价跌破 100 时;否则为假。完整的程序代码如下:

编码:PriceCutOffBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.task.输出Collector;
import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.输出FieldsDeclarer;

import backtype.storm.tuple.Tuple;

public class PriceCutOffBolt implements IRichBolt {
    Map<String, Integer> cutOffMap;
    Map<String, Boolean> resultMap;
	
    private 输出Collector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, 输出Collector collector) {
        this.cutOffMap = new HashMap <String, Integer>();
        this.cutOffMap.put("INTC", 100);
        this.cutOffMap.put("AAPL", 100);
        this.cutOffMap.put("GOOGL", 100);

        this.resultMap = new HashMap<String, Boolean>();
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String company = tuple.getString(0);
        Double price = tuple.getDouble(1);

        if(this.cutOffMap.containsKey(company)){
            Integer cutOffPrice = this.cutOffMap.get(company);

            if(price < cutOffPrice) {
                this.resultMap.put(company, true);
            } else {
                this.resultMap.put(company, false);
            }
        }
		
        collector.ack(tuple);
    }

    @Override
    public void cleanup() {
        for(Map.Entry<String, Boolean> entry:resultMap.entrySet()){
            System.out.println(entry.getKey()+" : " + entry.getValue());
        }
    }

    @Override
    public void declare输出Fields(输出FieldsDeclarer declarer) {
        declarer.declare(new Fields("cut_off_price"));
    }
	
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
	
}

提交拓扑


这是 YahooFinanceSpout.java 和 PriceCutOffBolt.java 连接在一起并生成拓扑的主要应用程序。以下程序代码显示了如何提交拓扑。

编码:YahooFinanceStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class YahooFinanceStorm {
    public static void main(String[] args) throws Exception{
        Config config = new Config();
        config.setDebug(true);
		
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("yahoo-finance-spout", new YahooFinanceSpout());

        builder.setBolt("price-cutoff-bolt", new PriceCutOffBolt())
            .fieldsGrouping("yahoo-finance-spout", new Fields("company"));
			
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("YahooFinanceStorm", config, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}

构建和运行应用程序


完整的应用程序包含三个 Java 代码。它们如下:

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • 雅虎金融风暴.java

可以使用以下命令构建应用程序:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*” *.java

应用程序可以使用以下命令运行:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:”/path/to/yahoofinance/lib/*”:.
YahooFinanceStorm

输出将类似于以下内容:

GOOGL : false
AAPL : false
INTC : true