@@ -64,9 +64,9 @@ public static void main(String[] args) throws Exception {
6464 .withUrl ("jdbc:postgresql://postgres_db:5432/myapp" ).withDriverName ("org.postgresql.Driver" )
6565 .withUsername ("admin" ).withPassword ("secure_password" ).build ();
6666
67- dailyStream .addSink (createJdbcSink (jdbcOptions , 100 , 5000L , false ));
67+ dailyStream .addSink (createJdbcSink (jdbcOptions , 1 , 0 , false ));
6868
69- rollingStream .addSink (createJdbcSink (jdbcOptions , 10 , 1000L , true ));
69+ rollingStream .addSink (createJdbcSink (jdbcOptions , 1 , 0 , true ));
7070
7171 env .execute ("FileSegment Analytics" );
7272 }
@@ -97,8 +97,8 @@ private static SinkFunction<UserAggregateStat> createJdbcSink(JdbcConnectionOpti
9797 + "activity_durations = EXCLUDED.activity_durations, " + "updated_at = NOW();" ;
9898 } else {
9999 sql = "INSERT INTO user_stats_aggregate ("
100- + "user_id, window_type, window_start, window_end, lang_durations, machine_durations, editor_durations, "
101- + "project_durations, activity_durations) "
100+ + "user_id, window_type, lang_durations, machine_durations, editor_durations, "
101+ + "project_durations, activity_durations, window_start, window_end ) "
102102 + "VALUES (?, ?, ?, ?, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb, ?::jsonb) "
103103 + "ON CONFLICT (user_id, window_type, window_start) DO UPDATE SET "
104104 + "window_end = EXCLUDED.window_end, " + "lang_durations = EXCLUDED.lang_durations, "
0 commit comments