Skip to content

Latest commit

 

History

History
378 lines (287 loc) · 10.1 KB

File metadata and controls

378 lines (287 loc) · 10.1 KB

3.4 Flink 流处理

📍 导航返回目录 | 上一节:Kafka | 下一节:etcd


核心概念

Flink vs Spark Streaming

特性 Flink Spark Streaming
处理模型 真实流处理 微批处理
延迟 毫秒级 秒级
状态管理 原生支持 需手动实现
事件时间 原生支持 有限支持
Exactly-Once

DataStream API

基础示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 从 Kafka 读取数据
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>(
        "input-topic",
        new SimpleStringSchema(),
        properties
    ));

// 转换
DataStream<WordCount> counts = stream
    .flatMap((String line, Collector<String> out) -> {
        for (String word : line.split("\\s")) {
            out.collect(word);
        }
    })
    .keyBy(word -> word)
    .sum("count");

// 输出
counts.addSink(new FlinkKafkaProducer<>(
    "output-topic",
    new SimpleStringSchema(),
    properties
));

// 执行
env.execute("Word Count");

时间语义

三种时间

  1. Event Time(事件时间):事件实际发生的时间
  2. Processing Time(处理时间):Flink 处理事件的时间
  3. Ingestion Time(摄入时间):进入 Flink 的时间
// 设置时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// 提取时间戳和 Watermark
stream.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);

Watermark(水位线)

作用:处理乱序事件,判断窗口何时关闭

// 固定延迟 Watermark
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))

// 单调递增 Watermark(无乱序)
WatermarkStrategy.forMonotonousTimestamps()

窗口操作

窗口类型

// 1. 滚动窗口(Tumbling Window)
stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .sum("value");

// 2. 滑动窗口(Sliding Window)
stream.keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)))
    .sum("value");

// 3. 会话窗口(Session Window)
stream.keyBy(...)
    .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
    .sum("value");

// 4. 全局窗口(Global Window)+ 触发器
stream.keyBy(...)
    .window(GlobalWindows.create())
    .trigger(CountTrigger.of(100))
    .sum("value");

窗口函数

// 1. ReduceFunction(增量聚合)
stream.keyBy(...)
    .window(...)
    .reduce((v1, v2) -> v1 + v2);

// 2. AggregateFunction(增量聚合)
stream.keyBy(...)
    .window(...)
    .aggregate(new AverageAggregate());

// 3. ProcessWindowFunction(全量聚合)
stream.keyBy(...)
    .window(...)
    .process(new MyProcessWindowFunction());

状态管理

Keyed State

import org.apache.flink.api.common.state.*;

public class MyFunction extends RichFlatMapFunction<Event, Result> {
    // Value State
    private transient ValueState<Long> countState;
    
    // List State
    private transient ListState<String> listState;
    
    // Map State
    private transient MapState<String, Long> mapState;
    
    @Override
    public void open(Configuration parameters) {
        // 初始化状态
        ValueStateDescriptor<Long> descriptor =
            new ValueStateDescriptor<>("count", Long.class, 0L);
        countState = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public void flatMap(Event event, Collector<Result> out) throws Exception {
        Long count = countState.value();
        count++;
        countState.update(count);
        out.collect(new Result(count));
    }
}

Operator State

public class MySource implements SourceFunction<Event>, CheckpointedFunction {
    private transient ListState<Long> offsetState;
    private Long offset = 0L;
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        offsetState.clear();
        offsetState.add(offset);
    }
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Long> descriptor = 
            new ListStateDescriptor<>("offset", Long.class);
        offsetState = context.getOperatorStateStore().getListState(descriptor);
        
        if (context.isRestored()) {
            for (Long o : offsetState.get()) {
                offset = o;
            }
        }
    }
}

Checkpoint 机制

配置

// 启用 Checkpoint
env.enableCheckpointing(60000);  // 每60秒一次

// Checkpoint 语义
env.getCheckpointConfig()
    .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 超时时间
env.getCheckpointConfig()
    .setCheckpointTimeout(300000);  // 5分钟

// 并发 Checkpoint 数量
env.getCheckpointConfig()
    .setMaxConcurrentCheckpoints(1);

// 失败后保留 Checkpoint
env.getCheckpointConfig()
    .enableExternalizedCheckpoints(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    );

状态后端

// 1. MemoryStateBackend(开发测试)
env.setStateBackend(new MemoryStateBackend());

// 2. FsStateBackend(生产推荐)
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

// 3. RocksDBStateBackend(超大状态)
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoints"));

实战案例:实时用户行为分析

public class UserBehaviorAnalysis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        
        // 读取 Kafka 数据
        DataStream<UserBehavior> stream = env
            .addSource(new FlinkKafkaConsumer<>("user_behavior", new UserBehaviorSchema(), properties))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                    .withTimestampAssigner((event, timestamp) -> event.getTimestamp())
            );
        
        // 统计每小时热门商品 Top 10
        DataStream<String> topItems = stream
            .filter(behavior -> behavior.getBehavior().equals("pv"))
            .keyBy(UserBehavior::getItemId)
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
            .aggregate(new CountAgg(), new WindowResultFunction())
            .keyBy(ItemViewCount::getWindowEnd)
            .process(new TopNHotItems(10));
        
        topItems.print();
        env.execute("Hot Items Analysis");
    }
}

// 聚合函数
class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
    @Override
    public Long createAccumulator() { return 0L; }
    
    @Override
    public Long add(UserBehavior value, Long accumulator) {
        return accumulator + 1;
    }
    
    @Override
    public Long getResult(Long accumulator) { return accumulator; }
    
    @Override
    public Long merge(Long a, Long b) { return a + b; }
}

// Top N 处理函数
class TopNHotItems extends KeyedProcessFunction<Long, ItemViewCount, String> {
    private int topSize;
    private ListState<ItemViewCount> itemState;
    
    public TopNHotItems(int topSize) {
        this.topSize = topSize;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        ListStateDescriptor<ItemViewCount> descriptor = 
            new ListStateDescriptor<>("itemState", ItemViewCount.class);
        itemState = getRuntimeContext().getListState(descriptor);
    }
    
    @Override
    public void processElement(ItemViewCount value, Context ctx, Collector<String> out) throws Exception {
        itemState.add(value);
        ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        List<ItemViewCount> allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        
        allItems.sort((a, b) -> Long.compare(b.getCount(), a.getCount()));
        
        StringBuilder result = new StringBuilder();
        result.append("时间: ").append(new Timestamp(timestamp - 1)).append("\n");
        for (int i = 0; i < Math.min(topSize, allItems.size()); i++) {
            ItemViewCount item = allItems.get(i);
            result.append("No").append(i + 1).append(":")
                  .append(" 商品ID=").append(item.getItemId())
                  .append(" 浏览量=").append(item.getCount())
                  .append("\n");
        }
        result.append("===============================\n");
        
        out.collect(result.toString());
        itemState.clear();
    }
}

本章小结

Flink 是新一代流处理引擎,支持事件时间、精确一次语义、高吞吐低延迟,是实时计算的首选。

关键要点

  • ✅ Event Time + Watermark 处理乱序事件
  • ✅ 多种窗口类型满足不同场景
  • ✅ Checkpoint 机制保证 Exactly-Once 语义
  • ✅ 状态管理支持复杂业务逻辑

扩展阅读

  • 《Flink 原理与实践》
  • 《Stream Processing with Apache Flink》
  • Flink 官方文档

💡 思考题

  1. Event Time 和 Processing Time 有什么区别?何时用哪个?
  2. Watermark 的作用是什么?如何设置合适的延迟?
  3. Checkpoint 如何保证 Exactly-Once 语义?

⏮️ 上一节:Kafka | ⏭️ 下一节:etcd