Spark Streaming入门 - 从Socket接收数据 实现wordCount统计单词个数
功能概述在158.158.4.49上运行nc -lk 9998,往9998端口发送数据编写代码,实现wordCount,Spark Streaming消费TCP Server(158.158.4.49)发过来的实时数据import org.apache.spark.api.java.StorageLevels;import org.apache.spark.api.java.function.Fl
·
功能概述
在158.158.4.49上运行nc -lk 9998,往9998端口发送数据
编写代码,实现wordCount,Spark Streaming消费TCP Server(158.158.4.49)发过来的实时数据
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;
public class JavaLocalNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
// StreamingContext 编程入口
JavaStreamingContext ssc = new JavaStreamingContext(
"local[2]",
"JavaLocalNetworkWordCount",
Durations.seconds(4),
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaLocalNetworkWordCount.class.getClass()));
ssc.sparkContext().setLogLevel("ERROR");
//数据接收器(Receiver)
//创建一个接收器(JavaReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理
//java8的普通写法
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
"158.158.4.49", 9998, StorageLevels.MEMORY_AND_DISK_SER);
//数据处理(Process)
//处理的逻辑,就是简单的进行word count
JavaDStream<String> flatMapDStream = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) throws Exception {
String[] s1 = Pattern.compile(" ").split(x);
return Arrays.asList(s1).iterator();
}
});
JavaPairDStream<String, Integer> mapToPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> reduceDStream = mapToPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
//java8 lambda的写法
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
//结果输出(Output)
//将结果输出到控制台
wordCounts.print();
reduceDStream.print();
//显式的启动数据接收
ssc.start();
try {
//来等待计算完成
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
运行结果如下
注意看Time中的时间,就是在创建JavaStreamingContext时设置的4秒而定的
-------------------------------------------
Time: 1597224872000 ms
-------------------------------------------
(aa,1)
(cc,1)
(bb,1)
-------------------------------------------
Time: 1597224876000 ms
-------------------------------------------
(aa,2)
(bb,2)

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)