Skip to content

Commit e58aabb

Browse files
committed
feat(analytics): keey track of user project and language sessions
1 parent 2085a23 commit e58aabb

6 files changed

Lines changed: 346 additions & 123 deletions

File tree

analytics/src/main/java/file_segment_analytics/FileSegmentAnalyticsJob.java

Lines changed: 164 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -18,133 +18,180 @@
1818
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
1919
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
2020
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
21+
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
2122
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
2223
import org.apache.flink.streaming.api.windowing.time.Time;
2324

2425
import models.EnrichedFileSegment;
2526
import sinks.JdbcSinkFactory;
2627
import stats.UserAggregateStat;
28+
import stats.UserLangSessionStat;
2729
import stats.UserRollingStat;
2830
import stats.UserProjectAggregateStat;
2931
import stats.UserProjectRollingStat;
32+
import stats.UserProjectSessionStat;
3033
import stats.StatFactory;
3134

3235
public class FileSegmentAnalyticsJob {
33-
public static void main(String[] args) throws Exception {
34-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
35-
// TODO: Control parallelism
36-
env.setParallelism(1);
37-
38-
Properties consumerConfig = new Properties();
39-
try (InputStream stream = FileSegmentAnalyticsJob.class.getClassLoader()
40-
.getResourceAsStream("kafka_consumer.properties")) {
41-
consumerConfig.load(stream);
42-
}
43-
44-
KafkaSource<EnrichedFileSegment> kafkaSource = KafkaSource.<EnrichedFileSegment>builder()
45-
.setProperties(consumerConfig).setTopics("enriched_file_segments").setValueOnlyDeserializer(
46-
new JsonDeserializationSchema<>(TypeInformation.of(EnrichedFileSegment.class)))
47-
.build();
48-
49-
DataStream<EnrichedFileSegment> stream = env.fromSource(kafkaSource,
50-
WatermarkStrategy.forMonotonousTimestamps(), "enriched_file_segments");
51-
52-
// TODO: Change the watermark time in production
53-
DataStream<EnrichedFileSegment> timestampedStream = stream.assignTimestampsAndWatermarks(
54-
WatermarkStrategy.<EnrichedFileSegment>forBoundedOutOfOrderness(Duration.ofSeconds(1))
55-
.withTimestampAssigner((seg, ts) -> {
56-
return Instant.parse(seg.getEnd_time()).toEpochMilli();
57-
}));
58-
59-
DataStream<UserAggregateStat> dailyStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
60-
.window(TumblingEventTimeWindows.of(Time.days(1)))
61-
.process(new UserStatWindowFunction<Integer, UserAggregateStat>(new UserAggregateStatFactory()))
62-
.returns(TypeExtractor.getForClass(UserAggregateStat.class));
63-
64-
DataStream<UserRollingStat> rollingStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
65-
.window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
66-
.process(new UserStatWindowFunction<Integer, UserRollingStat>(new UserRollingStatFactory()))
67-
.returns(TypeExtractor.getForClass(UserRollingStat.class));
68-
69-
DataStream<UserProjectRollingStat> projectsRollingStream = timestampedStream
70-
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
71-
@Override
72-
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
73-
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
74-
}
75-
}).window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
76-
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectRollingStat>(
77-
new UserProjectRollingStatFactory()))
78-
.returns(TypeExtractor.getForClass(UserProjectRollingStat.class));
79-
80-
DataStream<UserProjectAggregateStat> projectsAggregateStream = timestampedStream
81-
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
82-
@Override
83-
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
84-
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
85-
}
86-
}).window(TumblingEventTimeWindows.of(Time.hours(24)))
87-
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectAggregateStat>(
88-
new UserProjectAggregateStatFactory()))
89-
.returns(TypeExtractor.getForClass(UserProjectAggregateStat.class));
90-
91-
// Use stat type definitions for columns and conflict keys
92-
93-
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
94-
.withUrl("jdbc:postgresql://postgres_db:5432/myapp").withDriverName("org.postgresql.Driver")
95-
.withUsername("admin").withPassword("secure_password").build();
96-
97-
// Sinks using asRecord (createGeneralSink)
98-
SinkFunction<UserAggregateStat> dailySink = JdbcSinkFactory.createGeneralSink("user_stats_aggregate_daily",
99-
UserAggregateStat.PRIMITIVE_COLUMNS, UserAggregateStat.JSONB_COLUMNS,
100-
String.join(", ", UserAggregateStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
101-
SinkFunction<UserRollingStat> rollingSink = JdbcSinkFactory.createGeneralSink("user_stats_rolling_day",
102-
UserRollingStat.PRIMITIVE_COLUMNS, UserRollingStat.JSONB_COLUMNS,
103-
String.join(", ", UserRollingStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
104-
SinkFunction<UserProjectAggregateStat> projectSink = JdbcSinkFactory.createGeneralSink(
105-
"user_project_stats_aggregate_daily", UserProjectAggregateStat.PRIMITIVE_COLUMNS,
106-
UserProjectAggregateStat.JSONB_COLUMNS, String.join(", ", UserProjectAggregateStat.CONFLICT_KEYS),
107-
jdbcOptions, 10, 1000);
108-
SinkFunction<UserProjectRollingStat> projectRollingSink = JdbcSinkFactory.createGeneralSink(
109-
"user_project_stats_rolling_day", UserProjectRollingStat.PRIMITIVE_COLUMNS,
110-
UserProjectRollingStat.JSONB_COLUMNS, String.join(", ", UserProjectRollingStat.CONFLICT_KEYS),
111-
jdbcOptions, 10, 1000);
112-
113-
dailyStream.addSink(dailySink);
114-
rollingStream.addSink(rollingSink);
115-
projectsAggregateStream.addSink(projectSink);
116-
projectsRollingStream.addSink(projectRollingSink);
117-
118-
env.execute("FileSegment Analytics");
119-
}
120-
121-
public static class UserAggregateStatFactory implements StatFactory<Integer, UserAggregateStat>, Serializable {
122-
@Override
123-
public UserAggregateStat create(Integer key) {
124-
return new UserAggregateStat(key);
125-
}
126-
}
127-
128-
public static class UserRollingStatFactory implements StatFactory<Integer, UserRollingStat>, Serializable {
129-
@Override
130-
public UserRollingStat create(Integer key) {
131-
return new UserRollingStat(key);
132-
}
133-
}
134-
135-
public static class UserProjectAggregateStatFactory
136-
implements StatFactory<Tuple2<Integer, String>, UserProjectAggregateStat>, Serializable {
137-
@Override
138-
public UserProjectAggregateStat create(Tuple2<Integer, String> key) {
139-
return new UserProjectAggregateStat(key.f0, key.f1);
140-
}
141-
}
142-
143-
public static class UserProjectRollingStatFactory
144-
implements StatFactory<Tuple2<Integer, String>, UserProjectRollingStat>, Serializable {
145-
@Override
146-
public UserProjectRollingStat create(Tuple2<Integer, String> key) {
147-
return new UserProjectRollingStat(key.f0, key.f1);
148-
}
149-
}
36+
public static void main(String[] args) throws Exception {
37+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
38+
// TODO: Control parallelism
39+
env.setParallelism(1);
40+
41+
Properties consumerConfig = new Properties();
42+
try (InputStream stream = FileSegmentAnalyticsJob.class.getClassLoader()
43+
.getResourceAsStream("kafka_consumer.properties")) {
44+
consumerConfig.load(stream);
45+
}
46+
47+
KafkaSource<EnrichedFileSegment> kafkaSource = KafkaSource.<EnrichedFileSegment>builder()
48+
.setProperties(consumerConfig).setTopics("enriched_file_segments").setValueOnlyDeserializer(
49+
new JsonDeserializationSchema<>(TypeInformation.of(EnrichedFileSegment.class)))
50+
.build();
51+
52+
DataStream<EnrichedFileSegment> stream = env.fromSource(kafkaSource,
53+
WatermarkStrategy.forMonotonousTimestamps(), "enriched_file_segments");
54+
55+
// TODO: Change the watermark time in production
56+
DataStream<EnrichedFileSegment> timestampedStream = stream.assignTimestampsAndWatermarks(
57+
WatermarkStrategy.<EnrichedFileSegment>forBoundedOutOfOrderness(Duration.ofSeconds(1))
58+
.withTimestampAssigner((seg, ts) -> {
59+
return Instant.parse(seg.getEnd_time()).toEpochMilli();
60+
}));
61+
62+
DataStream<UserAggregateStat> dailyStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
63+
.window(TumblingEventTimeWindows.of(Time.days(1)))
64+
.process(new UserStatWindowFunction<Integer, UserAggregateStat>(new UserAggregateStatFactory()))
65+
.returns(TypeExtractor.getForClass(UserAggregateStat.class));
66+
67+
DataStream<UserRollingStat> rollingStream = timestampedStream.keyBy(EnrichedFileSegment::getUser_id)
68+
.window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
69+
.process(new UserStatWindowFunction<Integer, UserRollingStat>(new UserRollingStatFactory()))
70+
.returns(TypeExtractor.getForClass(UserRollingStat.class));
71+
72+
DataStream<UserProjectRollingStat> projectsRollingStream = timestampedStream
73+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
74+
@Override
75+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
76+
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
77+
}
78+
}).window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
79+
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectRollingStat>(
80+
new UserProjectRollingStatFactory()))
81+
.returns(TypeExtractor.getForClass(UserProjectRollingStat.class));
82+
83+
DataStream<UserProjectAggregateStat> projectsAggregateStream = timestampedStream
84+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
85+
@Override
86+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
87+
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
88+
}
89+
}).window(TumblingEventTimeWindows.of(Time.hours(24)))
90+
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectAggregateStat>(
91+
new UserProjectAggregateStatFactory()))
92+
.returns(TypeExtractor.getForClass(UserProjectAggregateStat.class));
93+
94+
DataStream<UserProjectSessionStat> projectSessionStream = timestampedStream
95+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
96+
@Override
97+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
98+
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
99+
}
100+
}).window(EventTimeSessionWindows.withGap(Time.seconds(30))) // <<< session gap
101+
.process(new UserStatWindowFunction<>(new UserProjectSessionStatFactory()))
102+
.returns(TypeExtractor.getForClass(UserProjectSessionStat.class));
103+
104+
DataStream<UserLangSessionStat> langSessionStream = timestampedStream
105+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
106+
@Override
107+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
108+
return Tuple2.of(seg.getUser_id(), seg.getLang());
109+
}
110+
}).window(EventTimeSessionWindows.withGap(Time.seconds(30))) // <<< session gap
111+
.process(new UserStatWindowFunction<>(new UserLangSessionStatFactory()))
112+
.returns(TypeExtractor.getForClass(UserLangSessionStat.class));
113+
114+
JdbcConnectionOptions jdbcOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
115+
.withUrl("jdbc:postgresql://postgres_db:5432/myapp").withDriverName("org.postgresql.Driver")
116+
.withUsername("admin").withPassword("secure_password").build();
117+
118+
// Sinks using asRecord (createGeneralSink)
119+
SinkFunction<UserAggregateStat> dailySink = JdbcSinkFactory.createGeneralSink("user_stats_aggregate_daily",
120+
UserAggregateStat.PRIMITIVE_COLUMNS, UserAggregateStat.JSONB_COLUMNS,
121+
String.join(", ", UserAggregateStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
122+
SinkFunction<UserRollingStat> rollingSink = JdbcSinkFactory.createGeneralSink("user_stats_rolling_day",
123+
UserRollingStat.PRIMITIVE_COLUMNS, UserRollingStat.JSONB_COLUMNS,
124+
String.join(", ", UserRollingStat.CONFLICT_KEYS), jdbcOptions, 10, 1000);
125+
SinkFunction<UserProjectAggregateStat> projectSink = JdbcSinkFactory.createGeneralSink(
126+
"user_project_stats_aggregate_daily", UserProjectAggregateStat.PRIMITIVE_COLUMNS,
127+
UserProjectAggregateStat.JSONB_COLUMNS, String.join(", ", UserProjectAggregateStat.CONFLICT_KEYS),
128+
jdbcOptions, 10, 1000);
129+
SinkFunction<UserProjectRollingStat> projectRollingSink = JdbcSinkFactory.createGeneralSink(
130+
"user_project_stats_rolling_day", UserProjectRollingStat.PRIMITIVE_COLUMNS,
131+
UserProjectRollingStat.JSONB_COLUMNS, String.join(", ", UserProjectRollingStat.CONFLICT_KEYS),
132+
jdbcOptions, 10, 1000);
133+
134+
SinkFunction<UserProjectSessionStat> projectSessionSink = JdbcSinkFactory.createGeneralSink(
135+
"user_project_session", UserProjectSessionStat.PRIMITIVE_COLUMNS, UserProjectSessionStat.JSONB_COLUMNS,
136+
UserProjectSessionStat.CONFLICT_KEYS, jdbcOptions, 10, 1000);
137+
138+
SinkFunction<UserLangSessionStat> langSessionSink = JdbcSinkFactory.createGeneralSink("user_lang_session",
139+
UserLangSessionStat.PRIMITIVE_COLUMNS, UserLangSessionStat.JSONB_COLUMNS,
140+
UserLangSessionStat.CONFLICT_KEYS, jdbcOptions, 10, 1000);
141+
142+
dailyStream.addSink(dailySink);
143+
rollingStream.addSink(rollingSink);
144+
projectsAggregateStream.addSink(projectSink);
145+
projectsRollingStream.addSink(projectRollingSink);
146+
projectSessionStream.addSink(projectSessionSink);
147+
langSessionStream.addSink(langSessionSink);
148+
149+
env.execute("FileSegment Analytics");
150+
}
151+
152+
public static class UserAggregateStatFactory implements StatFactory<Integer, UserAggregateStat>, Serializable {
153+
@Override
154+
public UserAggregateStat create(Integer key) {
155+
return new UserAggregateStat(key);
156+
}
157+
}
158+
159+
public static class UserRollingStatFactory implements StatFactory<Integer, UserRollingStat>, Serializable {
160+
@Override
161+
public UserRollingStat create(Integer key) {
162+
return new UserRollingStat(key);
163+
}
164+
}
165+
166+
public static class UserProjectAggregateStatFactory
167+
implements StatFactory<Tuple2<Integer, String>, UserProjectAggregateStat>, Serializable {
168+
@Override
169+
public UserProjectAggregateStat create(Tuple2<Integer, String> key) {
170+
return new UserProjectAggregateStat(key.f0, key.f1);
171+
}
172+
}
173+
174+
public static class UserProjectRollingStatFactory
175+
implements StatFactory<Tuple2<Integer, String>, UserProjectRollingStat>, Serializable {
176+
@Override
177+
public UserProjectRollingStat create(Tuple2<Integer, String> key) {
178+
return new UserProjectRollingStat(key.f0, key.f1);
179+
}
180+
}
181+
182+
public static class UserProjectSessionStatFactory
183+
implements StatFactory<Tuple2<Integer, String>, UserProjectSessionStat>, Serializable {
184+
@Override
185+
public UserProjectSessionStat create(Tuple2<Integer, String> key) {
186+
return new UserProjectSessionStat(key.f0, key.f1);
187+
}
188+
}
189+
190+
public static class UserLangSessionStatFactory
191+
implements StatFactory<Tuple2<Integer, String>, UserLangSessionStat>, Serializable {
192+
@Override
193+
public UserLangSessionStat create(Tuple2<Integer, String> key) {
194+
return new UserLangSessionStat(key.f0, key.f1);
195+
}
196+
}
150197
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package stats;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
8+
import models.EnrichedFileSegment;
9+
10+
public class UserLangSessionStat implements IStat {
11+
12+
public static final String[] PRIMITIVE_COLUMNS = { "user_id", "lang", "window_start", "window_end",
13+
"work_duration_ms" };
14+
public static final String[] JSONB_COLUMNS = {};
15+
public static final String CONFLICT_KEYS = "user_id, lang, window_start";
16+
17+
private final int user_id;
18+
private final String lang;
19+
20+
private long total_duration = 0;
21+
private Instant window_start = null;
22+
private Instant window_end = null;
23+
24+
public UserLangSessionStat(int user_id, String lang) {
25+
this.user_id = user_id;
26+
this.lang = lang;
27+
}
28+
29+
public int getUserId() {
30+
return user_id;
31+
}
32+
33+
public String getLang() {
34+
return lang;
35+
}
36+
37+
public long getTotalDuration() {
38+
return total_duration;
39+
}
40+
41+
public Instant getWindowStart() {
42+
return window_start;
43+
}
44+
45+
public Instant getWindowEnd() {
46+
return window_end;
47+
}
48+
49+
@Override
50+
public Map<String, Object> asRecord() {
51+
Map<String, Object> map = new HashMap<>();
52+
map.put("user_id", user_id);
53+
map.put("lang", lang);
54+
map.put("work_duration_ms", total_duration);
55+
map.put("window_start", window_start);
56+
map.put("window_end", window_end);
57+
return map;
58+
}
59+
60+
public void add(EnrichedFileSegment seg) {
61+
if (seg.getLang() == null)
62+
return;
63+
if (!seg.getLang().equals(lang))
64+
return;
65+
66+
Instant start = Instant.parse(seg.getStart_time());
67+
Instant end = Instant.parse(seg.getEnd_time());
68+
long duration = Duration.between(start, end).toMillis();
69+
70+
total_duration += duration;
71+
72+
if (window_start == null || start.isBefore(window_start))
73+
window_start = start;
74+
if (window_end == null || end.isAfter(window_end))
75+
window_end = end;
76+
}
77+
78+
public void postProcess(WindowContext ctx) {
79+
window_start = ctx.getWindowStart();
80+
window_end = ctx.getWindowEnd();
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return "UserLangSessionStat{user_id=" + user_id + ", lang='" + lang + '\'' + ", duration=" + total_duration
86+
+ ", window=[" + window_start + " → " + window_end + "] }";
87+
}
88+
}

0 commit comments

Comments
 (0)