简单流程:

flume监听order.txt文件,一旦有数据添加就获取order.txt中新增的数据,并传给kafka;

kakfa作为消息队列,将flume传来的数据交给flink处理,flink处理完就存储到云服务器的Redis中;

在百度云的suger大屏产品中通过API拉取从Redis中返回Json数据,达到实时展示的效果

配置准备

  • jar包:向order.txt中产生数据

  • Flume配置信息

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source,使用exec source定时执行tail -F命令监控日志文件
a1.sources.r1.type = exec
#外部命令 (-F只认文件名,不认文件标识id)
a1.sources.r1.command = tail -F /data/log/order.log
a1.sources.r1.charset = UTF-8


# 配置channel,使用文件暂存数据
# 配置channel组件
a1.channels.c1.type = file

#元数据信息存放路径
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/ShiXunHwWork/checkpoint

#实际数据存放路径
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/ShiXunHwWork/data
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 1000


# 配置sink,使用Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092

#将数据写到kafka中的shixunwork的topic中
a1.sinks.k1.kafka.topic = shixunwork
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.useFlumeEventFormat = true
a1.sinks.k1.kafka.producer.acks = -1


a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • flink处理kafka数据的核心代码
package ShiXunHmwork;


import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;

public class Kafka_Flink_Redis {

    public static void main(String[] args) throws Exception {
        /**
         * 获取执行环境
         */

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /**
         *
         * 2. 获取数据源:DataSource ===> Kafka
         */
        //配置kafka数据源
        String topic = "shixunwork";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "bigdata01:9092");
        prop.setProperty("group.id", "consumer02");  //指定消费组
        FlinkKafkaConsumer<String> FlinkKafkaStream = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop);

        DataStreamSource<String> lineStream = env.addSource(FlinkKafkaStream); //将数据源加入到执行环境中,获取输入流数据


        /**
         *
         * 3. Transform:将获得到的输入流进行处理
         */
        //3.1使用flatmap获取数据流,并切分
            //切分为一个个的 json 数据
        SingleOutputStreamOperator<String> jsonStream = lineStream.flatMap(new FlatMapFunction<String, String>() {
            //3>  �{"orderNo":"b76efa14-b7df-4264-8a4f-4f8f61480b49","totalPrice":4756,
            //          "detal":[{"goodsNo":"9003","goodsCount":2,"goodsPrice":2378,"goodsName":"全友 沙发","goodsType":"家具"}]}
            //解决类似的字符串乱码问题
            @Override
            public void flatMap(String s, Collector<String> out) throws Exception {
                String[] str = s.split("\n");  //按换行符切分
                for (String json : str) {
                    // 找到第一个 '{' 的位置
                    int startIndex = s.indexOf("{");
                    String jsonpart = s.substring(startIndex);
                    out.collect(jsonpart);  //将 json 流切分为一个个的json数据并加入到输出流中
                }
            }
        });
        //jsonStream.print();   // 测试输出,看看是否正确


        //3.2.1 使用Map方法,统计每条订单的总金额
        SingleOutputStreamOperator<Integer> totalPriceStream = jsonStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String jsondata) throws Exception {
                //先对每一条json数据从String转为json提取订单总金额
                JSONObject jsonObject = JSONObject.parseObject(jsondata);  //转为json
                int TotalMoney = jsonObject.getIntValue("totalPrice");

                return TotalMoney;
            }
        });

//        totalPriceStream.print();   //测试输出

        //3.2.2:继续利用flatmap方法获取detal中的:goodsCount[商品数据]、goodsType[商品类型]
            //记得过滤数据:商品数量要大于 0
            //返回二元组:(goodsType,goodsCount)
        SingleOutputStreamOperator<Tuple2<String, Integer>> GoodsTypeTopN = jsonStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String jsonlines, Collector<Tuple2<String, Integer>> out) throws Exception {
                //1. 先转为json
                JSONObject jsonObject = JSONObject.parseObject(jsonlines);
                //2. 获取detal json列表
                JSONArray jsonarray = jsonObject.getJSONArray("detal");
                //3. 遍历
                for (Object jsondata : jsonarray) {
                    //转为JsonObject
                    //{"goodsNo":"6003","goodsCount":2,"goodsPrice":2999,"goodsName":"容声 529升","goodsType":"冰箱"}
                    JSONObject jsdata = (JSONObject) jsondata;
                    //获取想用的字段
                    String goodstype = jsdata.getString("goodsType");
                    int goodscount = jsdata.getIntValue("goodsCount");
                    if (goodscount > 0) {  //过滤异常数据
                        //合并为二元组
                        Tuple2<String, Integer> tuple2 = new Tuple2<>(goodstype, goodscount);
                        out.collect(tuple2);  //加入到输入流
                    }
                }
            }
        });
        //3.2.3 flatmap--map
        SingleOutputStreamOperator<Tuple2<String, Integer>> GoodsTypeTopNMap = GoodsTypeTopN.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2;
            }
        });
//        GoodsTypeTopN.print();  //测试输出

        //3.3 keyby分组
        KeyedStream<Tuple2<String, Integer>, Tuple> goodskeyStream = GoodsTypeTopNMap.keyBy(0);
        //3.4 时间窗口函数
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> goodtimeStream = goodskeyStream.timeWindow(Time.seconds(2));
        //3.5 使用sum聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> goodsumStream = goodtimeStream.sum(1);
//        goodsumStream.print();  //测试输出
        /**
         * 4. 写入到 Redis中
         */

        //使用自定义类来写入Redis
        //如果这个Redis中的key:day0708,存在value,则score加,否者创建在加入
        //8.134.223.107
        //monitor

        //这个将订单总金额加入进去
        totalPriceStream.addSink(new LinkRedis("8.134.223.107",6380,"sxkeymoney"));
//        totalPriceStream.addSink(new LinkRedis("bigdata01",6379,"sxkeymoney"));
        //这个将商品类别topN加入进去
        GoodsTypeTopN.addSink(new goodsTypeRedis("8.134.223.107",6380,"sxkeytopn"));
//        goodsumStream.addSink(new goodsTypeRedis("bigdata01",6379,"sxkeytopn"));

        env.execute("Kafka_Flink_Redis");

    }
}
  •  自定义Redis类:用于定义flink将数据处理后存入Redis的数据形式
    •         goodsTypeRedis类与LinkRedis类
package ShiXunHmwork;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class goodsTypeRedis extends RichSinkFunction<Tuple2<String,Integer>> {
    private Jedis jedis = null;  //连接对象,先定义后声明
    private String host;
    private int post;
    private String Redis_Key;  //定义要写入Redis中的key

    public goodsTypeRedis(String host,int post,String key){
        this.host = host;
        this.post = post;
        this.Redis_Key = key;
    }

    /**
     * 该方法用于于Redis建立连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        this.jedis = new Jedis(host,post);
    }


    /**
     * 核心函数
     * 每条数据过来的时候都会进入这个方法
     * 该方法用来解析JSON数据
     */
    //利用方法的重构实现不同的需求
    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        String word = value.f0;  //获取二元组的第一个元素
        Integer count = value.f1;
        jedis.zincrby(Redis_Key,Double.parseDouble(count.toString()),word); //Redis中的方法
    }




    @Override
    public void close() throws Exception {
        if (jedis != null)
            jedis.close();
    }
}

     

package ShiXunHmwork;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;


/**
 * 该类用于将Flink的数据写入到Redis中
 */
public class LinkRedis extends RichSinkFunction<Integer>{
    private Jedis jedis = null;  //连接对象,先定义后声明
    private String host;
    private int post;
    private String Redis_Key;  //定义要写入Redis中的key

    public LinkRedis(String host,int post,String key){
        this.host = host;
        this.post = post;
        this.Redis_Key = key;
    }

    /**
     * 该方法用于于Redis建立连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        this.jedis = new Jedis(host,post);
    }


    /**
     * 核心函数
     * 每条数据过来的时候都会进入这个方法
     * 该方法用来解析JSON数据
     */
    //利用方法的重构实现不同的需求
    @Override
    public void invoke(Integer value, Context context) throws Exception {
        jedis.zincrby(Redis_Key,Double.parseDouble(value.toString()),"TotalMoney"); //Redis中的方法
    }




    @Override
    public void close() throws Exception {
        if (jedis != null)
            jedis.close();
    }
}
  • 使用springboot从Redis中获取数据,并返回json数据:springboot核心代码
package xuwei.controller;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import xuwei.utils.RedisUtils;

import java.util.Set;

/**
 * 数据接口V1.0
 * Created by xuwei
 */
@RestController//控制器类
@RequestMapping("/v1")//映射路径
public class DataController {
    private static final Logger logger = LoggerFactory.getLogger(DataController.class);
    /**
     * 测试接口
     * @param name
     * @return
     */
    @RequestMapping(value="/t1",method = RequestMethod.GET)
    public String test(@RequestParam("name") String name) {

        return "hello,"+name;
    }

    /**
     * 查询数码类的数据指标
     * @param key
     * @return
     */
    @RequestMapping(value="/nl1",method = RequestMethod.GET)
    public JSONObject getNumerical(@RequestParam("key") String key) {
        //key 相当于是redis中的key,后续根据这个key到redis中获取对应的数据
        /**
         * {
         *   "status": 0,
         *   "hitSugarSelf": true,
         *   "msg": "",
         *   "data": "371570"
         * }
         */
        JSONObject jsonObj = new JSONObject();   //可以用来创建json格式的数据
        jsonObj.put("status",0);
        jsonObj.put("hitSugarSelf",true);
        jsonObj.put("msg","");
        Jedis jedis = null;
        try{
            //获取redis连接
            jedis = RedisUtils.getJedis();
            //从redis中读取数据
            String value = jedis.get(key);
            jsonObj.put("data",value);
        }catch (Exception e){
            jsonObj.put("data","0");
        }finally {
            if(jedis!=null){
                RedisUtils.returnResource(jedis);
            }
        }
        return jsonObj;
    }


    /**
     * 该网址 http://8.134.223.107:8002/v1/totalmoney?kay=sxkeymoney
     * 用于获取总金额
     * @param key
     * @return
     */
    @RequestMapping(value="/totalmoney",method = RequestMethod.GET)
    public JSONObject totalmoney(@RequestParam("key") String key) {
        JSONObject jsonObj = new JSONObject();   //可以用来创建json格式的数据
        jsonObj.put("status",0);
        jsonObj.put("hitSugarSelf",true);
        jsonObj.put("msg","");
        Jedis jedis = null;
        try{
            //获取redis连接
            jedis = RedisUtils.getJedis();
            //从redis中读取数据

            double value = jedis.zscore("sxkeymoney","TotalMoney");
            jsonObj.put("data",value);
        }catch (Exception e){
            jsonObj.put("data","0");
        }finally {
            if(jedis!=null){
                RedisUtils.returnResource(jedis);
            }
        }
            return jsonObj;
    }


    /**
     * 该网站:http://8.134.223.107:8002/v1/topN?key=sxkeytopn
     * @param key
     * @return
     */
    @RequestMapping(value="/topN",method = RequestMethod.GET)
    public JSONObject getTopN(@RequestParam("key") String key) {
        JSONObject jsonObj = new JSONObject(true);   //可以用来创建json格式的数据
        JSONObject jsonObj01 = new JSONObject();   //可以用来创建json格式的数据 == > data
        jsonObj.put("status",0);
        jsonObj.put("hitSugarSelf",true);
        jsonObj.put("msg","");
        Jedis jedis = null;
        /**
         * 最终要返回
         * data:{"columns":[{},{}], "rows":[{},{},{}], "row":[{},{},{}]..}
         */
        try{
            //获取redis连接
            jedis = RedisUtils.getJedis();
            //阶段一,获取columns的数据
            JSONArray jsoncolumnsArray = new JSONArray();  //json数组,里面元素为json数据->jsoncolumnArray:[]
            JSONObject jsondata01 = new JSONObject();  //生成json数据,为json数组的其中一个元素-->jsondata01:{}
            JSONObject jsondata02 = new JSONObject();  //生成json数据,为json数组的另一个元素-->jsondata01:{}
            JSONObject jsondata03 = new JSONObject();  //生成json数据,为json数组的另一个元素-->jsondata01:{}
            jsondata01.put("id", "country");
            jsondata01.put("name", "类别"); //-->jsondata01:{"id":"type","name":"类别"}
            jsondata02.put("id","number");
            jsondata02.put("name","数量");  //-->jsondata02:{"id":"number","name":"数量"}
            jsondata03.put("id","gold");
            jsondata03.put("name","销售数量");  //-->jsondata02:{"id":"number","name":"数量"}
            jsoncolumnsArray.add(jsondata01);
            jsoncolumnsArray.add(jsondata02);
            jsoncolumnsArray.add(jsondata03);//-->json..Array:[{"id":"type","name":"类别"},{"id":"number","name":"数量"}]
            /*
            * jsonObj01:{"columns",[{"id":"type","name":"类别"},{"id":"number","name":"数量"}]}
            * */
            jsonObj01.put("columns",jsoncolumnsArray);  //由于data的value也是一个json,需要jsonobject
            //阶段二,获取 "rows":[{type,count},{type,count},...]
            //按 score:商品次数排序,取前5名
            JSONArray rowJsonArray = new JSONArray();;  //定义row的json列表[]
            Set<Tuple> goodsets = jedis.zrevrangeWithScores("sxkeytopn", 0, 4);
            for (Tuple tuple:goodsets){
                JSONObject rowjsonObject01 = new JSONObject();
                JSONObject rowjsonObject02 = new JSONObject();
                String goodstype = tuple.getElement();
                Double goodscount = tuple.getScore();
                rowjsonObject01.put("country",goodstype);
                rowjsonObject01.put("gold",goodscount);
                rowjsonObject01.put("number",goodscount);
                rowJsonArray.add(rowjsonObject01);
//                rowJsonArray.add(rowjsonObject02);
            }
            /*
            * jsonObj01:{"columns",[{"id":"type","name":"类别"},{"id":"number","name":"数量"}]
            *             ,"row":[{3,"xx"},{4,"xx"},{..},{..},{..}]
            *             }
            *
            * jsonObj:{ "data": {"columns",[{"id":"type","name":"类别"},{"id":"number","name":"数量"}]
             *                    ,"row":[{3,"xx"},{4,"xx"},{..},{..},{..}]
             *                   }
             *         }
            * */
            jsonObj01.put("rows",rowJsonArray);
            jsonObj.put("data",jsonObj01);

        }catch (Exception e){
            jsonObj.put("data","0");
        }finally {
            if(jedis!=null){
                RedisUtils.returnResource(jedis);
            }
        }
        return jsonObj;
    }

}
  • 在百度云的suger大屏产品中通过api拉取来获取这个json数据,并实时展示

操作流程

  • 开启flume服务

[root@bigdata01 apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/shixun_file_flume_kafka_flink_redis.conf -Dflume.root.logger=INFO,console

  • 开启Kafka服务 

1. 先开启zookeeper服务

[root@bigdata01 apache-zookeeper-3.5.8-bin]# bin/zkServer.sh start

2. 开启kafka服务

[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties

  • 运行前面的Kafka_Flink_Redis代码
  • 将前面的springboot代码打成jar包上传到云服务器中
  • 在云服务器中运行jar包

  • 在本地虚拟机中运行generate_data-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 开始实时计算

流程总结

  • 运行generate_data-1.0-SNAPSHOT-jar-with-dependencies.jar后会一直向order.txt文件中创建数据,数据格式如下

{"orderNo":"db62805c-a0cb-40ce-a986-2480b765e150","totalPrice":20748,"detal":[{"goodsNo":"1006","goodsCount":1,"goodsPrice":4198,"goodsName":"vivo IQOO7","goodsType":"手机"},{"goodsNo":"3001","goodsCount":2,"goodsPrice":2299,"goodsName":"小米电视 55英寸 E55X","goodsType":"电视"},{"goodsNo":"1003","goodsCount":2,"goodsPrice":4699,"goodsName":"小米 11","goodsType":"手机"},{"goodsNo":"9002","goodsCount":2,"goodsPrice":128,"goodsName":"家逸 椅子","goodsType":"家具"},{"goodsNo":"4003","goodsCount":1,"goodsPrice":2298,"goodsName":"奥克斯 1.5匹京裕","goodsType":"空调"}]}
 

  • flume会一直监听order.txt文件,一有数据新增就会采集
  • flume会将数据发送到消息队列kafka中
  • Kafka_Flink_Redis代码会利用flink将kafka中的数据按自定义的Redis类处理逻辑存储到云服务器的Redis中     
  • 云服务器中运行的springboot会将Redis中的数据获取并按要求处理成需要的json格式数据
  • 百度云suger产品会通过api链接获取处理好的json格式,并展示出来

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐