在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。除了这个,还有一些其他场景的应用,比如我们在我们的后台系统实时的展示我们网站当前的pv、uv等等,其实做法都是类似的。
今天我们就做一个最简单的模拟电商统计大屏的小例子,我们抽取一下最简单的需求。
首先我们通过自定义source 模拟订单的生成,生成了一个Tuple2,第一个元素是分类,第二个元素表示这个分类下产生的订单金额,金额我们通过随机生成.
/** * 模拟生成某一个分类下的订单生成 */ public static class MySource implements SourceFunction<Tuple2<String,Double>>{
private volatile boolean isRunning = true; private Random random = new Random(); String category[] = { "女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公" }; @Override public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{ while (isRunning){ Thread.sleep(10); //某一个分类 String c = category[(int) (Math.random() * (category.length - 1))]; //某一个分类下产生了price的成交订单 double price = random.nextDouble() * 100; ctx.collect(Tuple2.of(c, price)); } } @Override public void cancel(){ isRunning = false; } } 复制代码 public static class CategoryPojo{ // 分类名称 private String category; // 改分类总销售额 private double totalPrice; // 截止到当前时间的时间 private String dateTime; getter and setter ........ } 复制代码 DataStream<CategoryPojo> result = dataStream.keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.days( 1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds( 1))) .aggregate( new PriceAggregate(), new WindowResult() ); 复制代码 首先我们定义一个窗口期是一天的滚动窗口,然后设置一个1秒钟的触发器,之后进行聚合计算.
private static class PriceAggregate implements AggregateFunction<Tuple2<String,Double>,Double,Double>{ @Override public Double createAccumulator(){ return 0D; } @Override public Double add(Tuple2<String,Double> value, Double accumulator){ return accumulator + value.f1; } @Override public Double getResult(Double accumulator){ return accumulator; } @Override public Double merge(Double a, Double b){ return a + b; } } 复制代码 聚合计算也比较简单,其实就是对price的简单sum操作
private static class WindowResult implements WindowFunction<Double,CategoryPojo,Tuple,TimeWindow>{ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void apply( Tuple key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception{ CategoryPojo categoryPojo = new CategoryPojo(); categoryPojo.setCategory(((Tuple1<String>) key).f0); BigDecimal bg = new BigDecimal(input.iterator().next()); double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); categoryPojo.setTotalPrice(p); categoryPojo.setDateTime(simpleDateFormat.format(new Date())); out.collect(categoryPojo); } } 复制代码 我们最聚合的结果进行简单的封装,封装成CategoryPojo类以便后续处理
result.keyBy("dateTime") .window(TumblingProcessingTimeWindows.of(Time.seconds( 1))) .process(new WindowResultProcess()); 复制代码 接下来我们要使用上面聚合的结果,所以我们使用上面的window聚合结果流又定义了时间是1秒的滚动窗口.
如何使用窗口的结果,可以参考flink的官网[1]
接下来我们做最后的结果统计,在这里,我们会把各个分类的总价加起来,就是全站的总销量金额,然后我们同时使用优先级队列计算出分类销售的Top3,打印出结果,在生产过程中我们可以把这个结果数据发到hbase或者redis等外部存储,以供前端的实时页面展示。
private static class WindowResultProcess extends ProcessWindowFunction<CategoryPojo,Object,Tuple,TimeWindow>{ @Override public void process( Tuple tuple, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception{ String date = ((Tuple1<String>) tuple).f0; Queue<CategoryPojo> queue = new PriorityQueue<>( 3, (o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1); double price = 0D; Iterator<CategoryPojo> iterator = elements.iterator(); int s = 0; while (iterator.hasNext()){ CategoryPojo categoryPojo = iterator.next(); if (queue.size() < 3){ queue.add(categoryPojo); } else { CategoryPojo tmp = queue.peek(); if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){ queue.poll(); queue.add(categoryPojo); } } price += categoryPojo.getTotalPrice(); } List<String> list = queue.stream() .sorted((o1, o2)->o1.getTotalPrice() <= o2.getTotalPrice() ? 1 : -1) .map(f->"(分类:" + f.getCategory() + " 销售额:" + f.getTotalPrice() + ")") .collect( Collectors.toList()); System.out.println("时间 : " + date + " 总价 : " + price + " top3 " + StringUtils.join(list, ",")); System.out.println("-------------"); } } 复制代码 3> CategoryPojo{category='户外', totalPrice=734.45, dateTime=2020-06-13 22:55:34}2> CategoryPojo{category='游戏', totalPrice=862.86, dateTime=2020-06-13 22:55:34}4> CategoryPojo{category='洗护', totalPrice=926.83, dateTime=2020-06-13 22:55:34}3> CategoryPojo{category='运动', totalPrice=744.98, dateTime=2020-06-13 22:55:34}2> CategoryPojo{category='乐器', totalPrice=648.81, dateTime=2020-06-13 22:55:34}4> CategoryPojo{category='图书', totalPrice=1010.12, dateTime=2020-06-13 22:55:34}1> CategoryPojo{category='家具', totalPrice=880.35, dateTime=2020-06-13 22:55:34}3> CategoryPojo{category='家电', totalPrice=1225.34, dateTime=2020-06-13 22:55:34}2> CategoryPojo{category='男装', totalPrice=796.06, dateTime=2020-06-13 22:55:34}1> CategoryPojo{category='女装', totalPrice=1018.88, dateTime=2020-06-13 22:55:34}1> CategoryPojo{category='美妆', totalPrice=768.37, dateTime=2020-06-13 22:55:34}时间 : 2020-06-13 22:55:34 总价 : 9617.050000000001 top3 (分类:家电 销售额:1225.34),(分类:女装 销售额:1018.88),(分类:图书 销售额:1010.12) 复制代码 完整的代码请参考
文章来源:https://juejin.cn/post/6844904192180486158
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack