最近学习Flink用来测试socket处理数据,打开虚拟机使用linux发送未免太过麻烦,但是windows上的netcat安装解压后报毒,安全起见,我就还是卸载了,后来看到有说Namp下的ncat可以替代netcat 用于发送消息,那我就去下载了试试,发现确实可以替代。
Nmap下载地址:https://nmap.org/ncat/
解压之后添加到环境变量,确保可以随处执行,netcat发送消息的命令为 nc,Nmap为ncat,
详细点为
netcat,-lk表示一直占用着,7777是端口
换成Nmap
测试环节
使用Flink测试,这段程序会检测 7777 端口的信息,这段代码是用于词频统计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package cc.seektao.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountStreamSocket {
public static void main(String[] args) throws Exception {
// 1. 准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取数据
DataStreamSource<String> socketDS = env.socketTextStream("localhost", 7777);
// 3. 处理数据: 切分、转换、聚合、
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
.flatMap(
(String line, Collector<Tuple2<String, Integer>> collector) -> {
String[] words = line.split(" ");
for (String word : words) {
// 采集器采集数据发送到下游
collector.collect(new Tuple2<>(word, 1));
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT)) // landbda 表达式有类型擦除,需要手动指定泛型的参数类型
.keyBy(word -> word.f0)
.sum(1);
// 4. 输出
sum.print();
// 5. 启动
env.execute();
}
}
|
运行Flink程序,程序会监听 7777 端口的信息
新开windows终端,输入
这时会阻塞,输入信息,比如
1
2
3
|
hello flink
hello spark
hello world
|
程序就会显示统计信息
1
2
3
4
5
6
|
3> (hello,1)
7> (flink,1)
1> (spark,1)
3> (hello,2)
5> (world,1)
3> (hello,3)
|