编辑切换为居中
添加图片注释,不超过 140 字(可选)
brew install apache-flink
一个文件,统计文件中每个单词出现的次数,分隔符是\t。统计结果我们直接打印在控制台(生产上肯定是Sink到目的地)
Maven 3.0.4(或更高版本)
Java 8
使用以下命令之一创建项目:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.8.0
允许为新创建的项目命名。 它将以交互方式询问您groupId,artifactId和包名称。
编辑切换为居中
添加图片注释,不超过 140 字(可选)
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.8.0
编辑切换为居中
添加图片注释,不超过 140 字(可选)
工作目录中将有一个新目录。 如果使用curl,则该目录称为quickstart。 否则,它具有artifactId的名称:
编辑切换为居中
添加图片注释,不超过 140 字(可选)
使用IDEA打开该项目即可!
示例项目是一个Maven项目,它包含两个类:StreamingJob和BatchJob是DataStream和DataSet程序的基本框架程序。 主要方法是程序的入口点,既可用于IDE测试/执行,也可用于正确部署。 建议将此项目导入IDE以进行开发和测试。 IntelliJ IDEA支持开箱即用的Maven项目。 不建议Eclipse
对于Flink,Java的默认JVM堆可能太小。须手动增加它。在IntelliJ IDEA中,推荐的更改JVM选项的方法来自Help | 编辑自定义VM选项菜单 -Xmx800m
如果要构建/打包项目,请转到项目目录并运行
mvn clean package
或者使用插件
编辑
添加图片注释,不超过 140 字(可选)
编辑切换为居中
添加图片注释,不超过 140 字(可选)
您将找到包含应用程序的JAR文件,以及可能已作为依赖项添加到应用程序的连接器和库:
target / <artifact-id> - <version> .jar
编辑
添加图片注释,不超过 140 字(可选)
注意:如果您使用与StreamingJob不同的类作为应用程序的主类/入口点,我们建议您相应地更改pom.xml文件中的mainClass设置。 这样,Flink可以从JAR文件运行应用程序,而无需另外指定主类。
env.readTextFile(textPath);
.filter()
.flatMap()
.join()
.coGroup()
在相应目录下建立文本:
编辑
添加图片注释,不超过 140 字(可选)
测试代码:
编辑切换为居中
添加图片注释,不超过 140 字(可选)
成功读取:
编辑切换为居中
添加图片注释,不超过 140 字(可选)
Hello JavaEdge
Hello JavaEdge
(Hello,1) (JavaEdge,1)
groupBy
代码
编辑切换为居中
添加图片注释,不超过 140 字(可选)
结果
编辑切换为居中
添加图片注释,不超过 140 字(可选)
## 7 实时处理应用功能实现
package com.javaedge.flink.basic;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 第一个基于Flink实时处理快速入门案例
*/
public class StreamingWCApp {
public static void main(String[] args) throws Exception {
// 创建上下文
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 对接数据源的数据
DataStreamSource<String> source = env.socketTextStream("localhost", 9527);
// 业务逻辑处理: transformation
source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(",");
for (String word : words) {
out.collect(word.toLowerCase().trim());
}
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return StringUtils.isNotEmpty(value);
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1)
.print();
env.execute("StreamingWCApp");
}
}
可能遇到拒绝连接,记得
nc -lk 9527
socket发送数据:
编辑切换为居中
添加图片注释,不超过 140 字(可选)
控制台收到结果:
编辑切换为居中
添加图片注释,不超过 140 字(可选)
如何突破端口的限制呢,需重构:
传入参数args
编辑切换为居中
添加图片注释,不超过 140 字(可选)
获得参数:
编辑切换为居中
添加图片注释,不超过 140 字(可选)
Configuring Dependencies, Connectors, Libraries:
每个Flink应用程序都依赖于一组Flink库。 至少,应用程序依赖于Flink API。 许多应用程序还依赖于某些连接器库(如Kafka,Cassandra等)。 运行Flink应用程序时(在分布式部署中或在IDE中进行测试),Flink运行时库也必须可用。