Skip to content

Commit 16a2011

Browse files
committed
feat: Create User Project Rolling analytics
1 parent 7b1af4a commit 16a2011

3 files changed

Lines changed: 179 additions & 10 deletions

File tree

analytics/src/main/java/file_segment_analytics/FileSegmentAnalyticsJob.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import stats.UserAggregateStat;
2727
import stats.UserRollingStat;
2828
import stats.UserProjectAggregateStat;
29+
import stats.UserProjectRollingStat;
2930
import stats.StatFactory;
3031

3132
public class FileSegmentAnalyticsJob {
@@ -67,13 +68,24 @@ public static void main(String[] args) throws Exception {
6768
new UserRollingStatFactory()))
6869
.returns(TypeExtractor.getForClass(UserRollingStat.class));
6970

70-
DataStream<UserProjectAggregateStat> projectsRollingStream = timestampedStream
71+
DataStream<UserProjectRollingStat> projectsRollingStream = timestampedStream
7172
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
7273
@Override
7374
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
7475
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
7576
}
76-
}).window(TumblingEventTimeWindows.of(Time.days(1)))
77+
}).window(SlidingEventTimeWindows.of(Time.hours(24), Time.seconds(10)))
78+
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectRollingStat>("rolling_24h",
79+
new UserProjectRollingStatFactory()))
80+
.returns(TypeExtractor.getForClass(UserProjectRollingStat.class));
81+
82+
DataStream<UserProjectAggregateStat> projectsAggregateStream = timestampedStream
83+
.keyBy(new KeySelector<EnrichedFileSegment, Tuple2<Integer, String>>() {
84+
@Override
85+
public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception {
86+
return Tuple2.of(seg.getUser_id(), seg.getProject_path());
87+
}
88+
}).window(TumblingEventTimeWindows.of(Time.hours(24)))
7789
.process(new UserStatWindowFunction<Tuple2<Integer, String>, UserProjectAggregateStat>("daily",
7890
new UserProjectAggregateStatFactory()))
7991
.returns(TypeExtractor.getForClass(UserProjectAggregateStat.class));
@@ -95,14 +107,15 @@ public Tuple2<Integer, String> getKey(EnrichedFileSegment seg) throws Exception
95107
"user_project_stats_aggregate", UserProjectAggregateStat.PRIMITIVE_COLUMNS,
96108
UserProjectAggregateStat.JSONB_COLUMNS, String.join(", ", UserProjectAggregateStat.CONFLICT_KEYS),
97109
jdbcOptions, 10, 1000);
110+
SinkFunction<UserProjectRollingStat> projectRollingSink = JdbcSinkFactory.createGeneralSink(
111+
"user_project_stats_Rolling", UserProjectRollingStat.PRIMITIVE_COLUMNS,
112+
UserProjectRollingStat.JSONB_COLUMNS, String.join(", ", UserProjectRollingStat.CONFLICT_KEYS),
113+
jdbcOptions, 10, 1000);
98114

99115
dailyStream.addSink(dailySink);
100116
rollingStream.addSink(rollingSink);
101-
projectsRollingStream.addSink(projectSink);
102-
103-
// Optionally print for debug
104-
// rollingStream.print();
105-
// projectsRollingStream.print();
117+
projectsAggregateStream.addSink(projectSink);
118+
projectsRollingStream.addSink(projectRollingSink);
106119

107120
env.execute("FileSegment Analytics");
108121
}
@@ -128,4 +141,12 @@ public UserProjectAggregateStat create(Tuple2<Integer, String> key) {
128141
return new UserProjectAggregateStat(key.f0, key.f1);
129142
}
130143
}
144+
145+
public static class UserProjectRollingStatFactory
146+
implements StatFactory<Tuple2<Integer, String>, UserProjectRollingStat>, Serializable {
147+
@Override
148+
public UserProjectRollingStat create(Tuple2<Integer, String> key) {
149+
return new UserProjectRollingStat(key.f0, key.f1);
150+
}
151+
}
131152
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package stats;
2+
3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import models.EnrichedFileSegment;
8+
9+
public class UserProjectRollingStat implements IStat {
10+
public static final String[] PRIMITIVE_COLUMNS = { "user_id", "project_path", "window_type", };
11+
public static final String[] JSONB_COLUMNS = { "lang_durations", "machine_durations", "editor_durations",
12+
"activity_durations", "files_durations" };
13+
public static final String[] CONFLICT_KEYS = { "user_id", "project_path", "window_type" };
14+
private final int user_id;
15+
private final String project_path;
16+
private long total_duration;
17+
private String window_type;
18+
19+
private final HashMap<String, Long> machine_durations = new HashMap<>();
20+
private final HashMap<String, Long> language_durations = new HashMap<>();
21+
private final HashMap<String, Long> editor_durations = new HashMap<>();
22+
private final HashMap<String, Long> project_durations = new HashMap<>();
23+
private final HashMap<String, Long> activity_durations = new HashMap<>();
24+
private final HashMap<String, Long> files_durations = new HashMap<>();
25+
26+
public UserProjectRollingStat(int user_id, String project_path) {
27+
this.user_id = user_id;
28+
this.project_path = project_path;
29+
this.total_duration = 0;
30+
}
31+
32+
public void setWindowType(String window_type) {
33+
this.window_type = window_type;
34+
}
35+
36+
public String getWindowType() {
37+
return this.window_type;
38+
}
39+
40+
public int getUserId() {
41+
return user_id;
42+
}
43+
44+
public String getProjectPath() {
45+
return project_path;
46+
}
47+
48+
public long getTotalDuration() {
49+
return total_duration;
50+
}
51+
52+
public HashMap<String, Long> getMachineDurations() {
53+
return machine_durations;
54+
}
55+
56+
public HashMap<String, Long> getLangDurations() {
57+
return language_durations;
58+
}
59+
60+
public HashMap<String, Long> getEditorDurations() {
61+
return editor_durations;
62+
}
63+
64+
public HashMap<String, Long> getProjectDurations() {
65+
return project_durations;
66+
}
67+
68+
public HashMap<String, Long> getActivityDurations() {
69+
return activity_durations;
70+
}
71+
72+
public HashMap<String, Long> getFilesDurations() {
73+
return files_durations;
74+
}
75+
76+
@Override
77+
public Map<String, Object> asRecord() {
78+
Map<String, Object> map = new HashMap<>();
79+
map.put("user_id", this.user_id);
80+
map.put("project_path", this.project_path);
81+
map.put("window_type", this.window_type);
82+
map.put("lang_durations", this.language_durations);
83+
map.put("machine_durations", this.machine_durations);
84+
map.put("editor_durations", this.editor_durations);
85+
map.put("project_durations", this.project_durations);
86+
map.put("activity_durations", this.activity_durations);
87+
map.put("file_durations", this.files_durations);
88+
return map;
89+
}
90+
91+
public void add(EnrichedFileSegment seg) {
92+
long duration = Duration.between(Instant.parse(seg.getStart_time()), Instant.parse(seg.getEnd_time()))
93+
.toMillis();
94+
95+
this.total_duration += duration;
96+
97+
if (seg.getLang() != null)
98+
language_durations.merge(seg.getLang(), duration, Long::sum);
99+
100+
if (seg.getEditor() != null)
101+
editor_durations.merge(seg.getEditor(), duration, Long::sum);
102+
103+
if (seg.getMachine_name() != null)
104+
machine_durations.merge(seg.getMachine_name(), duration, Long::sum);
105+
106+
if (seg.getProject_name() != null)
107+
project_durations.merge(seg.getProject_name(), duration, Long::sum);
108+
109+
if (seg.getSegment_type() != null)
110+
activity_durations.merge(seg.getSegment_type(), duration, Long::sum);
111+
112+
if (seg.getFile_path() != null)
113+
files_durations.merge(seg.getFile_path(), duration, Long::sum);
114+
}
115+
116+
public void postProcess(WindowContext ctx) {
117+
setWindowType(ctx.getWindowType());
118+
}
119+
120+
@Override
121+
public String toString() {
122+
StringBuilder sb = new StringBuilder();
123+
sb.append("UserProjectStat{user_id=").append(user_id).append(", project_path=").append(project_path)
124+
.append(", total_duration=").append(total_duration).append(" ms");
125+
126+
appendCategory(sb, "files", files_durations);
127+
appendCategory(sb, "languages", language_durations);
128+
appendCategory(sb, "machines", machine_durations);
129+
appendCategory(sb, "projects", project_durations);
130+
appendCategory(sb, "activities", activity_durations);
131+
appendCategory(sb, "editor", editor_durations);
132+
133+
sb.append("}");
134+
return sb.toString();
135+
}
136+
137+
private void appendCategory(StringBuilder sb, String name, HashMap<String, Long> durations) {
138+
if (!durations.isEmpty()) {
139+
sb.append(", ").append(name).append("={");
140+
for (Map.Entry<String, Long> entry : durations.entrySet()) {
141+
double percent = (total_duration > 0) ? (entry.getValue() * 100.0 / total_duration) : 0.0;
142+
sb.append(entry.getKey()).append(": ").append(String.format("%.2f%%", percent)).append(" (")
143+
.append(entry.getValue()).append(" ms), ");
144+
}
145+
sb.setLength(sb.length() - 2);
146+
sb.append("}");
147+
}
148+
}
149+
}

backend/migrations/0002_analytics.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ create table if not exists user_project_stats_aggregate (
5353
create table if not exists user_project_stats_rolling (
5454
user_id integer,
5555
project_path varchar(500) not null,
56-
project_id int primary key,
5756
window_type text not null check (window_type in (
5857
'rolling_24h', 'rolling_7d', 'rolling_30d', 'rolling_365d'
5958
)),
@@ -68,10 +67,10 @@ create table if not exists user_project_stats_rolling (
6867

6968
create table if not exists user_project_session (
7069
user_id int not null,
71-
project_id int not null,
70+
project_path int not null,
7271
window_start timestamp not null,
7372
window_end timestamp not null,
74-
primary key (user_id, project_id, window_start)
73+
primary key (user_id, project_path, window_start)
7574
);
7675

7776
create table if not exists user_lang_session (

0 commit comments

Comments
 (0)