实时流计算系统设计与实现
上QQ阅读APP看书,第一时间看更新

2.2 使用Sprin Boot实现数据采集服务器

说到REST风格Web服务器开发,大部分Java编程开发者首先想到的是Spring系列中的Spring Boot。毫无疑问,Spring Boot使得用Java做Web服务开发的体验相比过去有了极大的提升。几乎在数分钟之内,一个可用的Web服务就可以开发完毕。所以,我们也用Spring Boot来实现数据采集服务器,具体实现如下。

@Controller
@EnableAutoConfiguration
public class SpringDataCollector {
    private static final Logger logger = LoggerFactory.getLogger
(SpringDataCollector.class);
    private JSONObject doExtractCleanTransform(JSONObject event) {
        // TODO: 实现抽取、清洗、转化的具体逻辑
        return event;
    }
    private final String kafkaBroker = "127.0.0.1:9092";
    private final String topic = "collector_event";
    private final KafkaSender kafkaSender = new KafkaSender(kafkaBroker);
    @PostMapping(path = "/event", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    @ResponseBody()
    public String uploadEvent(@RequestBody byte[] body) {
        // step1: 对消息进行解码
        JSONObject bodyJson = JSONObject.parseObject(new String(body, Charsets.UTF_8));
        // step2: 对消息进行抽取、清洗、转化
        JSONObject normEvent = doExtractCleanTransform(bodyJson);
        // step3: 将格式规整化的消息发到消息中间件Kafka
        kafkaSender.send(topic, normEvent.toJSONString().getBytes(Charsets.UTF_8));
        
        // 通知客户端数据采集成功
        return RestHelper.genResponse(200, "ok").toJSONString();
    }
    public static void main(String[] args) throws Exception {
        SpringApplication.run(SpringDataCollector.class, args);
    }
}

注意:为了节省篇幅,本书中的样例代码均只保留了主要逻辑以阐述问题,大部分略去了异常处理和日志打印。如需将这些代码用于真实产品环境,则需要读者自行添加异常处理和日志打印相关内容。异常处理和日志打印是可靠软件的重要因素,在编程开发时务必重视这两点。

在上面的示例代码中,uploadEvent实现了事件上报接口。收到上报事件后,首先对数据进行解码,解码结果用FastJson中的通用JSON类JSONObject表示;然后在JSONObject对象基础上进行抽取、清洗和转化,规整为统一格式数据;最后将规整好的数据发往数据传输系统Kafka。这个程序在实现功能上并没有特别的地方,我们只是感觉到基于Spring Boot的服务开发体验是如此轻松、愉快。