| 特性 | Flink | Spark Streaming |
|---|---|---|
| 处理模型 | 真实流处理 | 微批处理 |
| 延迟 | 毫秒级 | 秒级 |
| 状态管理 | 原生支持 | 需手动实现 |
| 事件时间 | 原生支持 | 有限支持 |
| Exactly-Once | ✅ | ✅ |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从 Kafka 读取数据
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>(
"input-topic",
new SimpleStringSchema(),
properties
));
// 转换
DataStream<WordCount> counts = stream
.flatMap((String line, Collector<String> out) -> {
for (String word : line.split("\\s")) {
out.collect(word);
}
})
.keyBy(word -> word)
.sum("count");
// 输出
counts.addSink(new FlinkKafkaProducer<>(
"output-topic",
new SimpleStringSchema(),
properties
));
// 执行
env.execute("Word Count");- Event Time(事件时间):事件实际发生的时间
- Processing Time(处理时间):Flink 处理事件的时间
- Ingestion Time(摄入时间):进入 Flink 的时间
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 提取时间戳和 Watermark
stream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);作用:处理乱序事件,判断窗口何时关闭
// 固定延迟 Watermark
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
// 单调递增 Watermark(无乱序)
WatermarkStrategy.forMonotonousTimestamps()// 1. 滚动窗口(Tumbling Window)
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("value");
// 2. 滑动窗口(Sliding Window)
stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
.sum("value");
// 3. 会话窗口(Session Window)
stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
.sum("value");
// 4. 全局窗口(Global Window)+ 触发器
stream.keyBy(...)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(100))
.sum("value");// 1. ReduceFunction(增量聚合)
stream.keyBy(...)
.window(...)
.reduce((v1, v2) -> v1 + v2);
// 2. AggregateFunction(增量聚合)
stream.keyBy(...)
.window(...)
.aggregate(new AverageAggregate());
// 3. ProcessWindowFunction(全量聚合)
stream.keyBy(...)
.window(...)
.process(new MyProcessWindowFunction());import org.apache.flink.api.common.state.*;
public class MyFunction extends RichFlatMapFunction<Event, Result> {
// Value State
private transient ValueState<Long> countState;
// List State
private transient ListState<String> listState;
// Map State
private transient MapState<String, Long> mapState;
@Override
public void open(Configuration parameters) {
// 初始化状态
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("count", Long.class, 0L);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Event event, Collector<Result> out) throws Exception {
Long count = countState.value();
count++;
countState.update(count);
out.collect(new Result(count));
}
}public class MySource implements SourceFunction<Event>, CheckpointedFunction {
private transient ListState<Long> offsetState;
private Long offset = 0L;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
offsetState.clear();
offsetState.add(offset);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Long> descriptor =
new ListStateDescriptor<>("offset", Long.class);
offsetState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Long o : offsetState.get()) {
offset = o;
}
}
}
}// 启用 Checkpoint
env.enableCheckpointing(60000); // 每60秒一次
// Checkpoint 语义
env.getCheckpointConfig()
.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 超时时间
env.getCheckpointConfig()
.setCheckpointTimeout(300000); // 5分钟
// 并发 Checkpoint 数量
env.getCheckpointConfig()
.setMaxConcurrentCheckpoints(1);
// 失败后保留 Checkpoint
env.getCheckpointConfig()
.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);// 1. MemoryStateBackend(开发测试)
env.setStateBackend(new MemoryStateBackend());
// 2. FsStateBackend(生产推荐)
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
// 3. RocksDBStateBackend(超大状态)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoints"));public class UserBehaviorAnalysis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 读取 Kafka 数据
DataStream<UserBehavior> stream = env
.addSource(new FlinkKafkaConsumer<>("user_behavior", new UserBehaviorSchema(), properties))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// 统计每小时热门商品 Top 10
DataStream<String> topItems = stream
.filter(behavior -> behavior.getBehavior().equals("pv"))
.keyBy(UserBehavior::getItemId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(ItemViewCount::getWindowEnd)
.process(new TopNHotItems(10));
topItems.print();
env.execute("Hot Items Analysis");
}
}
// 聚合函数
class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() { return 0L; }
@Override
public Long add(UserBehavior value, Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(Long accumulator) { return accumulator; }
@Override
public Long merge(Long a, Long b) { return a + b; }
}
// Top N 处理函数
class TopNHotItems extends KeyedProcessFunction<Long, ItemViewCount, String> {
private int topSize;
private ListState<ItemViewCount> itemState;
public TopNHotItems(int topSize) {
this.topSize = topSize;
}
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<ItemViewCount> descriptor =
new ListStateDescriptor<>("itemState", ItemViewCount.class);
itemState = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
itemState.add(value);
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
List<ItemViewCount> allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
allItems.sort((a, b) -> Long.compare(b.getCount(), a.getCount()));
StringBuilder result = new StringBuilder();
result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n");
for (int i = 0; i < Math.min(topSize, allItems.size()); i++) {
ItemViewCount item = allItems.get(i);
result.append("No").append(i + 1).append(":")
.append(" 商品ID=").append(item.getItemId())
.append(" 浏览量=").append(item.getCount())
.append("\n");
}
result.append("===============================\n");
out.collect(result.toString());
itemState.clear();
}
}Flink 是新一代流处理引擎,支持事件时间、精确一次语义、高吞吐低延迟,是实时计算的首选。
关键要点:
- ✅ Event Time + Watermark 处理乱序事件
- ✅ 多种窗口类型满足不同场景
- ✅ Checkpoint 机制保证 Exactly-Once 语义
- ✅ 状态管理支持复杂业务逻辑
- 《Flink 原理与实践》
- 《Stream Processing with Apache Flink》
- Flink 官方文档
💡 思考题:
- Event Time 和 Processing Time 有什么区别?何时用哪个?
- Watermark 的作用是什么?如何设置合适的延迟?
- Checkpoint 如何保证 Exactly-Once 语义?