Skip to content

Commit 069efff

Browse files
committed
2 parents 2db0692 + 16a2011 commit 069efff

11 files changed

Lines changed: 766 additions & 115 deletions
Lines changed: 87 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
package file_segment_analytics;
22

33
import java.io.InputStream;
4-
import java.sql.Timestamp;
4+
import java.io.Serializable;
55
import java.time.Duration;
66
import java.time.Instant;
77
import java.util.Properties;
88

99
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
1010
import org.apache.flink.api.common.typeinfo.TypeInformation;
11+
import org.apache.flink.api.java.functions.KeySelector;
12+
import org.apache.flink.api.java.tuple.Tuple2;
13+
import org.apache.flink.api.java.typeutils.TypeExtractor;
1114
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
12-
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
13-
import org.apache.flink.connector.jdbc.JdbcSink;
1415
import org.apache.flink.connector.kafka.source.KafkaSource;
1516
import org.apache.flink.formats.json.JsonDeserializationSchema;
16-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
17-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
1817
import org.apache.flink.streaming.api.datastream.DataStream;
1918
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2019
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -23,6 +22,12 @@
2322
import org.apache.flink.streaming.api.windowing.time.Time;
2423

2524
import models.EnrichedFileSegment;
25+
import sinks.JdbcSinkFactory;
26+
import stats.UserAggregateStat;
27+
import stats.UserRollingStat;
28+
import stats.UserProjectAggregateStat;
29+
import stats.UserProjectRollingStat;
30+
import stats.StatFactory;
2631

2732
public class FileSegmentAnalyticsJob {
2833
public static void main(String[] args) throws Exception {
@@ -52,90 +57,96 @@ public static void main(String[] args) throws Exception {
5257
}));
5358

5459
DataStream<UserAggregateStat> dailyStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
55-
.window(TumblingEventTimeWindows.of(Time.days(1))).process(new UserStatWindowFunction("daily"));
56-
57-
DataStream<UserAggregateStat> rollingStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
58-
.window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10))) // TODO: Change this beheavior to
59-
// 5
60-
// minutes I guess?
61-
.process(new UserStatWindowFunction("rolling_24h"));
60+
.window(TumblingEventTimeWindows.of(Time.days(1)))
61+
.process(
62+
new UserStatWindowFunction<Integer, UserAggregateStat>("daily", new UserAggregateStatFactory()))
63+
.returns(TypeExtractor.getForClass(UserAggregateStat.class));
64+
65+
DataStream<UserRollingStat> rollingStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
66+
.window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
67+
.process(new UserStatWindowFunction<Integer, UserRollingStat>("rolling_24h",
68+
new UserRollingStatFactory()))
69+
.returns(TypeExtractor.getForClass(UserRollingStat.class));
70+
71+
DataStream<UserProjectRollingStat> projectsRollingStream = timestampedStream
72+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
73+
@Override
74+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
75+
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
76+
}
77+
}).window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
78+
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectRollingStat>("rolling_24h",
79+
new UserProjectRollingStatFactory()))
80+
.returns(TypeExtractor.getForClass(UserProjectRollingStat.class));
81+
82+
DataStream<UserProjectAggregateStat> projectsAggregateStream = timestampedStream
83+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
84+
@Override
85+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
86+
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
87+
}
88+
}).window(TumblingEventTimeWindows.of(Time.hours(24)))
89+
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectAggregateStat>("daily",
90+
new UserProjectAggregateStatFactory()))
91+
.returns(TypeExtractor.getForClass(UserProjectAggregateStat.class));
92+
93+
// Use stat type definitions for columns and conflict keys
6294

6395
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
6496
.withUrl("jdbc:postgresql://postgres_db:5432/myapp").withDriverName("org.postgresql.Driver")
6597
.withUsername("admin").withPassword("secure_password").build();
6698

67-
dailyStream.addSink(createJdbcSink(jdbcOptions, 1, 0, false));
68-
69-
rollingStream.addSink(createJdbcSink(jdbcOptions, 1, 0, true));
99+
// Sinks using asRecord (createGeneralSink)
100+
SinkFunction<UserAggregateStat> dailySink = JdbcSinkFactory.createGeneralSink("user_stats_aggregate",
101+
UserAggregateStat.PRIMITIVE_COLUMNS, UserAggregateStat.JSONB_COLUMNS,
102+
String.join(", ", UserAggregateStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
103+
SinkFunction<UserRollingStat> rollingSink = JdbcSinkFactory.createGeneralSink("user_stats_rolling",
104+
UserRollingStat.PRIMITIVE_COLUMNS, UserRollingStat.JSONB_COLUMNS,
105+
String.join(", ", UserRollingStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
106+
SinkFunction<UserProjectAggregateStat> projectSink = JdbcSinkFactory.createGeneralSink(
107+
"user_project_stats_aggregate", UserProjectAggregateStat.PRIMITIVE_COLUMNS,
108+
UserProjectAggregateStat.JSONB_COLUMNS, String.join(", ", UserProjectAggregateStat.CONFLICT_KEYS),
109+
jdbcOptions, 10, 1000);
110+
SinkFunction<UserProjectRollingStat> projectRollingSink = JdbcSinkFactory.createGeneralSink(
111+
"user_project_stats_Rolling", UserProjectRollingStat.PRIMITIVE_COLUMNS,
112+
UserProjectRollingStat.JSONB_COLUMNS, String.join(", ", UserProjectRollingStat.CONFLICT_KEYS),
113+
jdbcOptions, 10, 1000);
114+
115+
dailyStream.addSink(dailySink);
116+
rollingStream.addSink(rollingSink);
117+
projectsAggregateStream.addSink(projectSink);
118+
projectsRollingStream.addSink(projectRollingSink);
70119

71120
env.execute("FileSegment Analytics");
72121
}
73122

74-
private static SinkFunction<UserAggregateStat> createJdbcSink(JdbcConnectionOptions jdbcOptions, int batchSize,
75-
long batchIntervalMs, boolean rolling) {
76-
String table = rolling ? "user_stats_rolling" : "user_stats_aggregate";
77-
String conflict = rolling ? "(user_id, window_type)" : "(user_id, window_type, window_start)";
78-
79-
String columns = "user_id, window_type, lang_durations, machine_durations, editor_durations, project_durations, activity_durations";
80-
String values = "?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb";
123+
public static class UserAggregateStatFactory implements StatFactory<Integer, UserAggregateStat>, Serializable {
124+
@Override
125+
public UserAggregateStat create(Integer key) {
126+
return new UserAggregateStat(key);
127+
}
128+
}
81129

82-
if (!rolling) {
83-
columns += ", window_start, window_end";
84-
values += ", ?, ?";
130+
public static class UserRollingStatFactory implements StatFactory<Integer, UserRollingStat>, Serializable {
131+
@Override
132+
public UserRollingStat create(Integer key) {
133+
return new UserRollingStat(key);
85134
}
135+
}
86136

87-
String sql;
88-
if (rolling) {
89-
sql = "INSERT INTO user_stats_rolling ("
90-
+ "user_id, window_type, lang_durations, machine_durations, editor_durations, "
91-
+ "project_durations, activity_durations) "
92-
+ "VALUES (?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
93-
+ "ON CONFLICT (user_id, window_type) DO UPDATE SET " + "lang_durations = EXCLUDED.lang_durations, "
94-
+ "machine_durations = EXCLUDED.machine_durations, "
95-
+ "editor_durations = EXCLUDED.editor_durations, "
96-
+ "project_durations = EXCLUDED.project_durations, "
97-
+ "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();";
98-
} else {
99-
sql = "INSERT INTO user_stats_aggregate ("
100-
+ "user_id, window_type, lang_durations, machine_durations, editor_durations, "
101-
+ "project_durations, activity_durations, window_start, window_end) "
102-
+ "VALUES (?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
103-
+ "ON CONFLICT (user_id, window_type, window_start) DO UPDATE SET "
104-
+ "window_end = EXCLUDED.window_end, " + "lang_durations = EXCLUDED.lang_durations, "
105-
+ "machine_durations = EXCLUDED.machine_durations, "
106-
+ "editor_durations = EXCLUDED.editor_durations, "
107-
+ "project_durations = EXCLUDED.project_durations, "
108-
+ "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();";
137+
public static class UserProjectAggregateStatFactory
138+
implements StatFactory<Tuple2<Integer, String>, UserProjectAggregateStat>, Serializable {
139+
@Override
140+
public UserProjectAggregateStat create(Tuple2<Integer, String> key) {
141+
return new UserProjectAggregateStat(key.f0, key.f1);
109142
}
143+
}
110144

111-
return JdbcSink.sink(sql, (ps, stat) -> {
112-
ObjectMapper mapper = new ObjectMapper();
113-
UserAggregateStat userStat = (UserAggregateStat) stat;
114-
ps.setInt(1, userStat.getUserId());
115-
ps.setString(2, stat.getWindowType());
116-
117-
if (!rolling) {
118-
ps.setTimestamp(8, Timestamp.from(userStat.getWindowStart()));
119-
ps.setTimestamp(9, Timestamp.from(userStat.getWindowEnd()));
120-
}
121-
122-
try {
123-
// 5-9: Serialize Map fields to JSON strings
124-
ps.setString(3, mapper.writeValueAsString(userStat.getLangDurations()));
125-
ps.setString(4, mapper.writeValueAsString(userStat.getMachineDurations()));
126-
ps.setString(5, mapper.writeValueAsString(userStat.getEditorDurations()));
127-
ps.setString(6, mapper.writeValueAsString(userStat.getProjectDurations()));
128-
ps.setString(7, mapper.writeValueAsString(userStat.getActivityDurations()));
129-
} catch (JsonProcessingException e) {
130-
// Handle serialization error gracefully
131-
e.printStackTrace();
132-
for (int i = 3; i <= 7; i++)
133-
ps.setString(i, "{}"); // Send empty JSON object on error
134-
}
135-
},
136-
// Define execution options (batching)
137-
JdbcExecutionOptions.builder().withBatchSize(batchSize).withBatchIntervalMs(batchIntervalMs).build(),
138-
// Provide JDBC connection details
139-
jdbcOptions);
145+
public static class UserProjectRollingStatFactory
146+
implements StatFactory<Tuple2<Integer, String>, UserProjectRollingStat>, Serializable {
147+
@Override
148+
public UserProjectRollingStat create(Tuple2<Integer, String> key) {
149+
return new UserProjectRollingStat(key.f0, key.f1);
150+
}
140151
}
141152
}

analytics/src/main/java/file_segment_analytics/UserStatWindowFunction.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,37 @@
11
package file_segment_analytics;
22

33
import models.EnrichedFileSegment;
4+
import stats.IStat;
5+
import stats.StatFactory;
6+
import stats.WindowContext;
7+
48
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
59
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
610
import org.apache.flink.util.Collector;
711
import java.time.Instant;
812

9-
public class UserStatWindowFunction
10-
extends ProcessWindowFunction<EnrichedFileSegment, UserAggregateStat, Integer, TimeWindow> {
13+
public class UserStatWindowFunction<K, S extends IStat>
14+
extends ProcessWindowFunction<EnrichedFileSegment, S, K, TimeWindow> {
1115

1216
private final String windowType;
17+
private final StatFactory<K, S> stat_factory;
1318

14-
public UserStatWindowFunction(String windowType) {
19+
public UserStatWindowFunction(String windowType, StatFactory<K, S> factory) {
1520
this.windowType = windowType;
21+
this.stat_factory = factory;
1622
}
1723

1824
@Override
19-
public void process(Integer userId, Context ctx, Iterable<EnrichedFileSegment> segments,
20-
Collector<UserAggregateStat> out) {
21-
UserAggregateStat stat = new UserAggregateStat(userId);
25+
public void process(K key, Context ctx, Iterable<EnrichedFileSegment> segments, Collector<S> out) {
26+
S stat = stat_factory.create(key);
2227
for (EnrichedFileSegment seg : segments) {
2328
stat.add(seg);
2429
}
2530

26-
stat.setWindowType(windowType);
27-
stat.setWindowStart(Instant.ofEpochMilli(ctx.window().getStart()));
28-
stat.setWindowEnd(Instant.ofEpochMilli(ctx.window().getEnd()));
31+
WindowContext windowContext = new WindowContext(windowType, Instant.ofEpochMilli(ctx.window().getStart()),
32+
Instant.ofEpochMilli(ctx.window().getEnd()));
33+
34+
stat.postProcess(windowContext);
2935

3036
out.collect(stat);
3137
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package sinks;
2+
3+
import java.sql.PreparedStatement;
4+
import java.sql.SQLException;
5+
import java.sql.Timestamp;
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
12+
import org.apache.flink.connector.jdbc.JdbcSink;
13+
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
14+
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
15+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
16+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
17+
18+
import stats.IStat;
19+
20+
public class JdbcSinkFactory {
21+
22+
public static <T extends IStat> SinkFunction<T> createGeneralSink(final String tableName,
23+
final String[] primitiveColumns, final String[] jsonbColumns, final String conflictColumns,
24+
final JdbcConnectionOptions jdbcOptions, final int batchSize, final long batchIntervalMs) {
25+
26+
final ObjectMapper mapper = new ObjectMapper();
27+
28+
List<String> allColumns = new ArrayList<>();
29+
allColumns.addAll(Arrays.asList(primitiveColumns));
30+
allColumns.addAll(Arrays.asList(jsonbColumns));
31+
32+
StringBuilder sqlBuilder = new StringBuilder();
33+
sqlBuilder.append("INSERT INTO ").append(tableName).append(" (").append(String.join(", ", allColumns))
34+
.append(") VALUES (");
35+
36+
for (int i = 0; i < allColumns.size(); i++) {
37+
if (i > 0)
38+
sqlBuilder.append(", ");
39+
if (i >= primitiveColumns.length) {
40+
sqlBuilder.append("?::jsonb");
41+
} else {
42+
sqlBuilder.append("?");
43+
}
44+
}
45+
sqlBuilder.append(")");
46+
47+
if (conflictColumns != null && !conflictColumns.isEmpty()) {
48+
sqlBuilder.append(" ON CONFLICT (").append(conflictColumns).append(") DO UPDATE SET ");
49+
boolean first = true;
50+
for (String col : allColumns) {
51+
if (!first)
52+
sqlBuilder.append(", ");
53+
sqlBuilder.append(col).append(" = EXCLUDED.").append(col);
54+
first = false;
55+
}
56+
sqlBuilder.append(", updated_at = NOW()");
57+
}
58+
sqlBuilder.append(";");
59+
60+
final String sql = sqlBuilder.toString();
61+
62+
return JdbcSink.sink(sql, new JdbcStatementBuilder<T>() {
63+
@Override
64+
public void accept(PreparedStatement ps, T stat) throws SQLException {
65+
Map<String, Object> record = stat.asRecord();
66+
int index = 1;
67+
for (String col : allColumns) {
68+
Object value = record.get(col);
69+
try {
70+
if (value instanceof java.time.Instant) {
71+
ps.setTimestamp(index++, Timestamp.from((java.time.Instant) value));
72+
} else if (value instanceof Number || value instanceof String) {
73+
ps.setObject(index++, value);
74+
} else {
75+
ps.setString(index++, mapper.writeValueAsString(value));
76+
}
77+
} catch (Exception e) {
78+
throw new SQLException("Failed to bind column: " + col, e);
79+
}
80+
}
81+
}
82+
}, JdbcExecutionOptions.builder().withBatchSize(batchSize).withBatchIntervalMs(batchIntervalMs).build(),
83+
jdbcOptions);
84+
}
85+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package stats;
2+
3+
import java.util.Map;
4+
5+
import models.EnrichedFileSegment;
6+
7+
public interface IStat {
8+
void add(EnrichedFileSegment seg);
9+
10+
void postProcess(WindowContext ctx);
11+
12+
Map<String, Object> asRecord();
13+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package stats;
2+
3+
@FunctionalInterface
4+
public interface StatFactory<K, T extends IStat> {
5+
T create(K key);
6+
}

0 commit comments

Comments
 (0)