@@ -64,44 +64,72 @@ public static void main(String[] args) throws Exception {
6464 .withUrl ("jdbc:postgresql://postgres_db:5432/myapp" ).withDriverName ("org.postgresql.Driver" )
6565 .withUsername ("admin" ).withPassword ("secure_password" ).build ();
6666
67- dailyStream .addSink (createJdbcSink (jdbcOptions , 100 , 5000L ));
67+ dailyStream .addSink (createJdbcSink (jdbcOptions , 100 , 5000L , false ));
6868
69- rollingStream .addSink (createJdbcSink (jdbcOptions , 10 , 1000L ));
69+ rollingStream .addSink (createJdbcSink (jdbcOptions , 10 , 1000L , true ));
7070
7171 env .execute ("FileSegment Analytics" );
7272 }
7373
7474 private static SinkFunction <UserAggregateStat > createJdbcSink (JdbcConnectionOptions jdbcOptions , int batchSize ,
75- long batchIntervalMs ) {
76- // The PostgreSQL UPSERT query uses ON CONFLICT DO UPDATE
77- String sql = "INSERT INTO user_stats_aggregate (user_id, window_type, window_start, window_end, lang_durations, machine_durations, editor_durations, project_durations, activity_durations) "
78- + "VALUES (?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
79- + "ON CONFLICT (user_id, window_type, window_start) DO UPDATE "
80- + "SET window_end = EXCLUDED.window_end, " + " lang_durations = EXCLUDED.lang_durations, "
81- + " machine_durations = EXCLUDED.machine_durations, "
82- + " editor_durations = EXCLUDED.editor_durations, "
83- + " project_durations = EXCLUDED.project_durations, "
84- + " activity_durations = EXCLUDED.activity_durations, " + " updated_at = NOW();" ;
75+ long batchIntervalMs , boolean rolling ) {
76+ String table = rolling ? "user_stats_rolling" : "user_stats_aggregate" ;
77+ String conflict = rolling ? "(user_id, window_type)" : "(user_id, window_type, window_start)" ;
78+
79+ String columns = "user_id, window_type, lang_durations, machine_durations, editor_durations, project_durations, activity_durations" ;
80+ String values = "?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb" ;
81+
82+ if (!rolling ) {
83+ columns += ", window_start, window_end" ;
84+ values += ", ?, ?" ;
85+ }
86+
87+ String sql ;
88+ if (rolling ) {
89+ sql = "INSERT INTO user_stats_rolling ("
90+ + "user_id, window_type, lang_durations, machine_durations, editor_durations, "
91+ + "project_durations, activity_durations) "
92+ + "VALUES (?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
93+ + "ON CONFLICT (user_id, window_type) DO UPDATE SET " + "lang_durations = EXCLUDED.lang_durations, "
94+ + "machine_durations = EXCLUDED.machine_durations, "
95+ + "editor_durations = EXCLUDED.editor_durations, "
96+ + "project_durations = EXCLUDED.project_durations, "
97+ + "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();" ;
98+ } else {
99+ sql = "INSERT INTO user_stats_aggregate ("
100+ + "user_id, window_type, window_start, window_end, lang_durations, machine_durations, editor_durations, "
101+ + "project_durations, activity_durations) "
102+ + "VALUES (?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
103+ + "ON CONFLICT (user_id, window_type, window_start) DO UPDATE SET "
104+ + "window_end = EXCLUDED.window_end, " + "lang_durations = EXCLUDED.lang_durations, "
105+ + "machine_durations = EXCLUDED.machine_durations, "
106+ + "editor_durations = EXCLUDED.editor_durations, "
107+ + "project_durations = EXCLUDED.project_durations, "
108+ + "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();" ;
109+ }
85110
86111 return JdbcSink .sink (sql , (ps , stat ) -> {
87112 ObjectMapper mapper = new ObjectMapper ();
88113 UserAggregateStat userStat = (UserAggregateStat ) stat ;
89114 ps .setInt (1 , userStat .getUserId ());
90115 ps .setString (2 , stat .getWindowType ());
91- ps .setTimestamp (3 , Timestamp .from (userStat .getWindowStart ()));
92- ps .setTimestamp (4 , Timestamp .from (userStat .getWindowEnd ()));
116+
117+ if (!rolling ) {
118+ ps .setTimestamp (8 , Timestamp .from (userStat .getWindowStart ()));
119+ ps .setTimestamp (9 , Timestamp .from (userStat .getWindowEnd ()));
120+ }
93121
94122 try {
95123 // 5-9: Serialize Map fields to JSON strings
96- ps .setString (5 , mapper .writeValueAsString (userStat .getLangDurations ()));
97- ps .setString (6 , mapper .writeValueAsString (userStat .getMachineDurations ()));
98- ps .setString (7 , mapper .writeValueAsString (userStat .getEditorDurations ()));
99- ps .setString (8 , mapper .writeValueAsString (userStat .getProjectDurations ()));
100- ps .setString (9 , mapper .writeValueAsString (userStat .getActivityDurations ()));
124+ ps .setString (3 , mapper .writeValueAsString (userStat .getLangDurations ()));
125+ ps .setString (4 , mapper .writeValueAsString (userStat .getMachineDurations ()));
126+ ps .setString (5 , mapper .writeValueAsString (userStat .getEditorDurations ()));
127+ ps .setString (6 , mapper .writeValueAsString (userStat .getProjectDurations ()));
128+ ps .setString (7 , mapper .writeValueAsString (userStat .getActivityDurations ()));
101129 } catch (JsonProcessingException e ) {
102130 // Handle serialization error gracefully
103131 e .printStackTrace ();
104- for (int i = 5 ; i <= 9 ; i ++)
132+ for (int i = 3 ; i <= 7 ; i ++)
105133 ps .setString (i , "{}" ); // Send empty JSON object on error
106134 }
107135 },
0 commit comments