Skip to content

Commit 7b1af4a

Browse files
committed
feat: Create an abstraction for all sinks (need refactoring)
1 parent 81772c4 commit 7b1af4a

7 files changed

Lines changed: 310 additions & 76 deletions

File tree

analytics/src/main/java/file_segment_analytics/FileSegmentAnalyticsJob.java

Lines changed: 36 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.io.InputStream;
44
import java.io.Serializable;
5-
import java.sql.Timestamp;
65
import java.time.Duration;
76
import java.time.Instant;
87
import java.util.Properties;
@@ -13,12 +12,8 @@
1312
import org.apache.flink.api.java.tuple.Tuple2;
1413
import org.apache.flink.api.java.typeutils.TypeExtractor;
1514
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
16-
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
17-
import org.apache.flink.connector.jdbc.JdbcSink;
1815
import org.apache.flink.connector.kafka.source.KafkaSource;
1916
import org.apache.flink.formats.json.JsonDeserializationSchema;
20-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
21-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2217
import org.apache.flink.streaming.api.datastream.DataStream;
2318
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2419
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -27,7 +22,9 @@
2722
import org.apache.flink.streaming.api.windowing.time.Time;
2823

2924
import models.EnrichedFileSegment;
25+
import sinks.JdbcSinkFactory;
3026
import stats.UserAggregateStat;
27+
import stats.UserRollingStat;
3128
import stats.UserProjectAggregateStat;
3229
import stats.StatFactory;
3330

@@ -64,100 +61,66 @@ public static void main(String[] args) throws Exception {
6461
new UserStatWindowFunction<Integer, UserAggregateStat>("daily", new UserAggregateStatFactory()))
6562
.returns(TypeExtractor.getForClass(UserAggregateStat.class));
6663

67-
DataStream<UserAggregateStat> rollingStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
64+
DataStream<UserRollingStat> rollingStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
6865
.window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
69-
.process(new UserStatWindowFunction<Integer, UserAggregateStat>("rolling_24h",
70-
new UserAggregateStatFactory()))
71-
.returns(TypeExtractor.getForClass(UserAggregateStat.class));
66+
.process(new UserStatWindowFunction<Integer, UserRollingStat>("rolling_24h",
67+
new UserRollingStatFactory()))
68+
.returns(TypeExtractor.getForClass(UserRollingStat.class));
7269

7370
DataStream<UserProjectAggregateStat> projectsRollingStream = timestampedStream
7471
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
7572
@Override
7673
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
7774
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
7875
}
79-
}).window(SlidingEventTimeWindows.of(Time.days(24), Time.seconds(10)))
80-
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectAggregateStat>("daily_project",
76+
}).window(TumblingEventTimeWindows.of(Time.days(1)))
77+
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectAggregateStat>("daily",
8178
new UserProjectAggregateStatFactory()))
8279
.returns(TypeExtractor.getForClass(UserProjectAggregateStat.class));
8380

81+
// Use stat type definitions for columns and conflict keys
82+
8483
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
8584
.withUrl("jdbc:postgresql://postgres_db:5432/myapp").withDriverName("org.postgresql.Driver")
8685
.withUsername("admin").withPassword("secure_password").build();
8786

88-
dailyStream.addSink(createJdbcSink(jdbcOptions, 1, 0, false));
89-
90-
rollingStream.addSink(createJdbcSink(jdbcOptions, 1, 0, true));
91-
92-
projectsRollingStream.print();
87+
// Sinks using asRecord (createGeneralSink)
88+
SinkFunction<UserAggregateStat> dailySink = JdbcSinkFactory.createGeneralSink("user_stats_aggregate",
89+
UserAggregateStat.PRIMITIVE_COLUMNS, UserAggregateStat.JSONB_COLUMNS,
90+
String.join(", ", UserAggregateStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
91+
SinkFunction<UserRollingStat> rollingSink = JdbcSinkFactory.createGeneralSink("user_stats_rolling",
92+
UserRollingStat.PRIMITIVE_COLUMNS, UserRollingStat.JSONB_COLUMNS,
93+
String.join(", ", UserRollingStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
94+
SinkFunction<UserProjectAggregateStat> projectSink = JdbcSinkFactory.createGeneralSink(
95+
"user_project_stats_aggregate", UserProjectAggregateStat.PRIMITIVE_COLUMNS,
96+
UserProjectAggregateStat.JSONB_COLUMNS, String.join(", ", UserProjectAggregateStat.CONFLICT_KEYS),
97+
jdbcOptions, 10, 1000);
98+
99+
dailyStream.addSink(dailySink);
100+
rollingStream.addSink(rollingSink);
101+
projectsRollingStream.addSink(projectSink);
102+
103+
// Optionally print for debug
104+
// rollingStream.print();
105+
// projectsRollingStream.print();
93106

94107
env.execute("FileSegment Analytics");
95108
}
96109

97-
private static SinkFunction<UserAggregateStat> createJdbcSink(JdbcConnectionOptions jdbcOptions, int batchSize,
98-
long batchIntervalMs, boolean rolling) {
99-
String sql;
100-
if (rolling) {
101-
sql = "INSERT INTO user_stats_rolling ("
102-
+ "user_id, window_type, lang_durations, machine_durations, editor_durations, "
103-
+ "project_durations, activity_durations) "
104-
+ "VALUES (?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
105-
+ "ON CONFLICT (user_id, window_type) DO UPDATE SET " + "lang_durations = EXCLUDED.lang_durations, "
106-
+ "machine_durations = EXCLUDED.machine_durations, "
107-
+ "editor_durations = EXCLUDED.editor_durations, "
108-
+ "project_durations = EXCLUDED.project_durations, "
109-
+ "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();";
110-
} else {
111-
sql = "INSERT INTO user_stats_aggregate ("
112-
+ "user_id, window_type, lang_durations, machine_durations, editor_durations, "
113-
+ "project_durations, activity_durations, window_start, window_end) "
114-
+ "VALUES (?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?, ?)"
115-
+ "ON CONFLICT (user_id, window_type, window_start) DO UPDATE SET "
116-
+ "window_end = EXCLUDED.window_end, " + "lang_durations = EXCLUDED.lang_durations, "
117-
+ "machine_durations = EXCLUDED.machine_durations, "
118-
+ "editor_durations = EXCLUDED.editor_durations, "
119-
+ "project_durations = EXCLUDED.project_durations, "
120-
+ "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();";
121-
}
122-
123-
return JdbcSink.sink(sql, (ps, stat) -> {
124-
ObjectMapper mapper = new ObjectMapper();
125-
UserAggregateStat userStat = (UserAggregateStat) stat;
126-
ps.setInt(1, userStat.getUserId());
127-
ps.setString(2, stat.getWindowType());
128-
129-
if (!rolling) {
130-
ps.setTimestamp(8, Timestamp.from(userStat.getWindowStart()));
131-
ps.setTimestamp(9, Timestamp.from(userStat.getWindowEnd()));
132-
}
133-
134-
try {
135-
// 5-9: Serialize Map fields to JSON strings
136-
ps.setString(3, mapper.writeValueAsString(userStat.getLangDurations()));
137-
ps.setString(4, mapper.writeValueAsString(userStat.getMachineDurations()));
138-
ps.setString(5, mapper.writeValueAsString(userStat.getEditorDurations()));
139-
ps.setString(6, mapper.writeValueAsString(userStat.getProjectDurations()));
140-
ps.setString(7, mapper.writeValueAsString(userStat.getActivityDurations()));
141-
} catch (JsonProcessingException e) {
142-
// Handle serialization error gracefully
143-
e.printStackTrace();
144-
for (int i = 3; i <= 7; i++)
145-
ps.setString(i, "{}"); // Send empty JSON object on error
146-
}
147-
},
148-
// Define execution options (batching)
149-
JdbcExecutionOptions.builder().withBatchSize(batchSize).withBatchIntervalMs(batchIntervalMs).build(),
150-
// Provide JDBC connection details
151-
jdbcOptions);
152-
}
153-
154110
public static class UserAggregateStatFactory implements StatFactory<Integer, UserAggregateStat>, Serializable {
155111
@Override
156112
public UserAggregateStat create(Integer key) {
157113
return new UserAggregateStat(key);
158114
}
159115
}
160116

117+
public static class UserRollingStatFactory implements StatFactory<Integer, UserRollingStat>, Serializable {
118+
@Override
119+
public UserRollingStat create(Integer key) {
120+
return new UserRollingStat(key);
121+
}
122+
}
123+
161124
public static class UserProjectAggregateStatFactory
162125
implements StatFactory<Tuple2<Integer, String>, UserProjectAggregateStat>, Serializable {
163126
@Override
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package sinks;
2+
3+
import java.sql.PreparedStatement;
4+
import java.sql.SQLException;
5+
import java.sql.Timestamp;
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
12+
import org.apache.flink.connector.jdbc.JdbcSink;
13+
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
14+
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
15+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
16+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
17+
18+
import stats.IStat;
19+
20+
public class JdbcSinkFactory {
21+
22+
public static <T extends IStat> SinkFunction<T> createGeneralSink(final String tableName,
23+
final String[] primitiveColumns, final String[] jsonbColumns, final String conflictColumns,
24+
final JdbcConnectionOptions jdbcOptions, final int batchSize, final long batchIntervalMs) {
25+
26+
final ObjectMapper mapper = new ObjectMapper();
27+
28+
List<String> allColumns = new ArrayList<>();
29+
allColumns.addAll(Arrays.asList(primitiveColumns));
30+
allColumns.addAll(Arrays.asList(jsonbColumns));
31+
32+
StringBuilder sqlBuilder = new StringBuilder();
33+
sqlBuilder.append("INSERT INTO ").append(tableName).append(" (").append(String.join(", ", allColumns))
34+
.append(") VALUES (");
35+
36+
for (int i = 0; i < allColumns.size(); i++) {
37+
if (i > 0)
38+
sqlBuilder.append(", ");
39+
if (i >= primitiveColumns.length) {
40+
sqlBuilder.append("?::jsonb");
41+
} else {
42+
sqlBuilder.append("?");
43+
}
44+
}
45+
sqlBuilder.append(")");
46+
47+
if (conflictColumns != null && !conflictColumns.isEmpty()) {
48+
sqlBuilder.append(" ON CONFLICT (").append(conflictColumns).append(") DO UPDATE SET ");
49+
boolean first = true;
50+
for (String col : allColumns) {
51+
if (!first)
52+
sqlBuilder.append(", ");
53+
sqlBuilder.append(col).append(" = EXCLUDED.").append(col);
54+
first = false;
55+
}
56+
sqlBuilder.append(", updated_at = NOW()");
57+
}
58+
sqlBuilder.append(";");
59+
60+
final String sql = sqlBuilder.toString();
61+
62+
return JdbcSink.sink(sql, new JdbcStatementBuilder<T>() {
63+
@Override
64+
public void accept(PreparedStatement ps, T stat) throws SQLException {
65+
Map<String, Object> record = stat.asRecord();
66+
int index = 1;
67+
for (String col : allColumns) {
68+
Object value = record.get(col);
69+
try {
70+
if (value instanceof java.time.Instant) {
71+
ps.setTimestamp(index++, Timestamp.from((java.time.Instant) value));
72+
} else if (value instanceof Number || value instanceof String) {
73+
ps.setObject(index++, value);
74+
} else {
75+
ps.setString(index++, mapper.writeValueAsString(value));
76+
}
77+
} catch (Exception e) {
78+
throw new SQLException("Failed to bind column: " + col, e);
79+
}
80+
}
81+
}
82+
}, JdbcExecutionOptions.builder().withBatchSize(batchSize).withBatchIntervalMs(batchIntervalMs).build(),
83+
jdbcOptions);
84+
}
85+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package stats;
22

3+
import java.util.Map;
4+
35
import models.EnrichedFileSegment;
46

57
public interface IStat {
68
void add(EnrichedFileSegment seg);
79

810
void postProcess(WindowContext ctx);
11+
12+
Map<String, Object> asRecord();
913
}

analytics/src/main/java/stats/UserAggregateStat.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import models.EnrichedFileSegment;
88

99
public class UserAggregateStat implements IStat {
10+
public final static String[] PRIMITIVE_COLUMNS = { "user_id", "window_type", "window_start", "window_end" };
11+
public final static String[] JSONB_COLUMNS = { "lang_durations", "machine_durations", "editor_durations",
12+
"project_durations", "activity_durations" };
13+
public final static String CONFLICT_KEYS = "user_id, window_type, window_start";
1014
private final int user_id;
1115
private long total_duration;
1216
private Instant window_start;
@@ -78,6 +82,21 @@ public HashMap<String, Long> getActivityDurations() {
7882
return activity_durations;
7983
}
8084

85+
@Override
86+
public Map<String, Object> asRecord() {
87+
Map<String, Object> map = new HashMap<>();
88+
map.put("user_id", this.user_id);
89+
map.put("window_type", this.window_type);
90+
map.put("lang_durations", this.language_durations);
91+
map.put("machine_durations", this.machine_durations);
92+
map.put("editor_durations", this.editor_durations);
93+
map.put("project_durations", this.project_durations);
94+
map.put("activity_durations", this.activity_durations);
95+
map.put("window_start", this.window_start);
96+
map.put("window_end", this.window_end);
97+
return map;
98+
}
99+
81100
public void add(EnrichedFileSegment seg) {
82101
long duration = Duration.between(Instant.parse(seg.getStart_time()), Instant.parse(seg.getEnd_time()))
83102
.toMillis();

analytics/src/main/java/stats/UserProjectAggregateStat.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
import models.EnrichedFileSegment;
88

99
public class UserProjectAggregateStat implements IStat {
10+
public static final String[] PRIMITIVE_COLUMNS = { "user_id", "project_path", "window_type", "window_start",
11+
"window_end", };
12+
public static final String[] JSONB_COLUMNS = { "lang_durations", "machine_durations", "editor_durations",
13+
"activity_durations", "files_durations" };
14+
public static final String[] CONFLICT_KEYS = { "user_id", "project_path", "window_type", "window_start" };
1015
private final int user_id;
1116
private final String project_path;
1217
private long total_duration;
@@ -89,6 +94,23 @@ public HashMap<String, Long> getFilesDurations() {
8994
return files_durations;
9095
}
9196

97+
@Override
98+
public Map<String, Object> asRecord() {
99+
Map<String, Object> map = new HashMap<>();
100+
map.put("user_id", this.user_id);
101+
map.put("project_path", this.project_path);
102+
map.put("window_type", this.window_type);
103+
map.put("lang_durations", this.language_durations);
104+
map.put("machine_durations", this.machine_durations);
105+
map.put("editor_durations", this.editor_durations);
106+
map.put("project_durations", this.project_durations);
107+
map.put("activity_durations", this.activity_durations);
108+
map.put("file_durations", this.files_durations);
109+
map.put("window_start", this.window_start);
110+
map.put("window_end", this.window_end);
111+
return map;
112+
}
113+
92114
public void add(EnrichedFileSegment seg) {
93115
long duration = Duration.between(Instant.parse(seg.getStart_time()), Instant.parse(seg.getEnd_time()))
94116
.toMillis();

0 commit comments

Comments
 (0)