Skip to content

Commit e6236d0

Browse files
committed
Unsubscribe subscriptions when user role is updated
1 parent 46dee84 commit e6236d0

7 files changed

Lines changed: 136 additions & 11 deletions

File tree

backend/src/main/java/club/devcord/devmarkt/DevelopmentStartup.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616

1717
package club.devcord.devmarkt;
1818

19-
import club.devcord.devmarkt.repositories.TemplateRepo;
20-
import io.micronaut.context.BeanContext;
2119
import io.micronaut.context.annotation.Requires;
2220
import io.micronaut.context.event.ApplicationEventListener;
2321
import io.micronaut.context.event.StartupEvent;
2422
import io.micronaut.json.JsonMapper;
2523
import io.micronaut.security.token.jwt.generator.JwtTokenGenerator;
26-
import jakarta.inject.Inject;
2724
import jakarta.inject.Singleton;
2825
import java.util.Map;
2926
import org.slf4j.Logger;

backend/src/main/java/club/devcord/devmarkt/auth/InvocationDataCustomizer.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package club.devcord.devmarkt.auth;
1818

19+
import club.devcord.devmarkt.ws.SessionMetaData;
1920
import graphql.ExecutionInput;
2021
import io.micronaut.configuration.graphql.GraphQLExecutionInputCustomizer;
2122
import io.micronaut.http.HttpRequest;
@@ -28,17 +29,26 @@
2829
public class InvocationDataCustomizer implements GraphQLExecutionInputCustomizer {
2930

3031
private final UserProvider provider;
32+
private final SessionMetaData sessionMetaData;
3133

32-
public InvocationDataCustomizer(UserProvider provider) {
34+
public InvocationDataCustomizer(UserProvider provider, SessionMetaData sessionMetaData) {
3335
this.provider = provider;
36+
this.sessionMetaData = sessionMetaData;
3437
}
3538

3639
@Override
3740
public Publisher<ExecutionInput> customize(ExecutionInput executionInput, HttpRequest httpRequest,
3841
MutableHttpResponse<String> httpResponse) {
3942
httpRequest.getHeaders().getAuthorization()
4043
.flatMap(provider::validate)
41-
.ifPresent(user -> executionInput.getGraphQLContext().put("user", user));
44+
.ifPresent(user -> {
45+
executionInput.getGraphQLContext().put("user", user);
46+
var session = sessionMetaData.getHttpRequestWebSocketSessionMap().get(httpRequest);
47+
if (session != null) {
48+
sessionMetaData.getUserSessions().put(user.id(), session);
49+
}
50+
});
51+
sessionMetaData.getHttpRequestWebSocketSessionMap().remove(httpRequest);
4252
return Mono.just(executionInput);
4353
}
4454
}

backend/src/main/java/club/devcord/devmarkt/services/UserService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@
2525
import club.devcord.devmarkt.responses.Success;
2626
import club.devcord.devmarkt.responses.failure.user.ErrorCode;
2727
import club.devcord.devmarkt.util.Admins;
28+
import club.devcord.devmarkt.ws.ReflectiveUnsubscriber;
2829
import jakarta.inject.Singleton;
2930
import java.util.Optional;
3031

3132
@Singleton
3233
public class UserService {
3334

3435
private final UserRepo repo;
36+
private final ReflectiveUnsubscriber reflectiveUnsubscriber;
3537

36-
public UserService(UserRepo repo) {
38+
public UserService(UserRepo repo, ReflectiveUnsubscriber reflectiveUnsubscriber) {
3739
this.repo = repo;
40+
this.reflectiveUnsubscriber = reflectiveUnsubscriber;
3841
}
3942

4043
public Optional<User> findDirect(UserId userId) {
@@ -51,6 +54,7 @@ public boolean delete(UserId userId) {
5154
if (Admins.isAdminUserId(userId)) {
5255
return false;
5356
}
57+
reflectiveUnsubscriber.unsubscribeSubscriptions(userId);
5458
return repo.deleteOneById(userId) >= 1;
5559
}
5660

@@ -72,7 +76,7 @@ public Response<User> updateRole(UserId userId, Role role) {
7276
if (Admins.isAdminUserId(userId)) {
7377
return new Failure<>(ErrorCode.ADMIN_USER_CANT_BE_MODIFIED);
7478
}
75-
79+
reflectiveUnsubscriber.unsubscribeSubscriptions(userId);
7680
var updated = repo.updateById(userId, role);
7781
return updated != 0
7882
? new Success<>(new User(-1, userId, role))

backend/src/main/java/club/devcord/devmarkt/ws/CustomGraphQLWsController.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@ public class CustomGraphQLWsController {
3939

4040
private final GraphQLWsController controller;
4141
private final GraphQLJsonSerializer serializer;
42+
private final SessionMetaData sessionMetaData;
4243

43-
public CustomGraphQLWsController(GraphQLWsController controller, GraphQLJsonSerializer serializer) {
44+
public CustomGraphQLWsController(GraphQLWsController controller, GraphQLJsonSerializer serializer,
45+
SessionMetaData sessionMetaData) {
4446
this.controller = controller;
4547
this.serializer = serializer;
48+
this.sessionMetaData = sessionMetaData;
4649
}
4750

4851
@OnOpen
@@ -53,11 +56,13 @@ public void onOpen(WebSocketSession session, HttpRequest<?> request) {
5356
@OnMessage
5457
public Publisher<GraphQLWsResponse> onMessage(String message, WebSocketSession session) {
5558
var type = serializer.deserialize(message, MessageType.class);
59+
var request = session.get("httpRequest", HttpRequest.class).get();
5660
if (ClientType.GQL_CONNECTION_INIT.getType().equals(type.type())) {
57-
var request = session.get("httpRequest", HttpRequest.class).get();
61+
var auth = (String) type.payload().get("Authorization");
5862
request.mutate()
59-
.header("Authorization", (String) type.payload().get("Authorization"));
63+
.header("Authorization", auth);
6064
}
65+
sessionMetaData.getHttpRequestWebSocketSessionMap().put(request, session);
6166
return controller.onMessage(message, session);
6267
}
6368

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2022 Contributors to the Devmarkt-Backend project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package club.devcord.devmarkt.ws;
18+
19+
import club.devcord.devmarkt.entities.auth.UserId;
20+
import io.micronaut.configuration.graphql.ws.GraphQLWsRequest;
21+
import io.micronaut.configuration.graphql.ws.GraphQLWsResponse;
22+
import io.micronaut.context.BeanContext;
23+
import io.micronaut.websocket.WebSocketSession;
24+
import jakarta.inject.Singleton;
25+
import java.util.Map;
26+
import org.reactivestreams.Publisher;
27+
import reactor.core.publisher.Flux;
28+
29+
// Dirty solution to unsubscribe a user's subscriptions (uses reflections)
30+
@Singleton
31+
public class ReflectiveUnsubscriber{
32+
33+
private final Object wsState;
34+
private final SessionMetaData metaData;
35+
36+
public ReflectiveUnsubscriber(BeanContext beanContext, SessionMetaData metaData) {
37+
this.metaData = metaData;
38+
try {
39+
var stateClazz = Class.forName("io.micronaut.configuration.graphql.ws.GraphQLWsState");
40+
this.wsState = beanContext.getBean(stateClazz);
41+
} catch (ClassNotFoundException e) {
42+
throw new RuntimeException(e);
43+
}
44+
}
45+
46+
public void unsubscribeSubscriptions(UserId userId) {
47+
var session = metaData.getUserSessions().get(userId);
48+
if (session == null) return;
49+
try {
50+
var activeOperationsField = wsState.getClass().getDeclaredField("activeOperations");
51+
activeOperationsField.setAccessible(true);
52+
var activeOperations = ((Map<String, ?>) activeOperationsField.get(wsState)).get(session.getId());
53+
var operationIdsField = activeOperations.getClass().getDeclaredField("activeOperations");
54+
operationIdsField.setAccessible(true);
55+
var operationIds = ((Map<String, ?>) operationIdsField.get(activeOperations)).keySet();
56+
var method = wsState.getClass().getDeclaredMethod("stopOperation", GraphQLWsRequest.class,
57+
WebSocketSession.class);
58+
method.setAccessible(true);
59+
for (var id : operationIds) {
60+
var request = new GraphQLWsRequest();
61+
request.setId(id);
62+
Flux.from((Publisher<GraphQLWsResponse>) method.invoke(wsState, request, session))
63+
.subscribe(session::sendSync);
64+
}
65+
} catch (Throwable e) {
66+
throw new RuntimeException(e);
67+
}
68+
}
69+
70+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2022 Contributors to the Devmarkt-Backend project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package club.devcord.devmarkt.ws;
18+
19+
import club.devcord.devmarkt.entities.auth.UserId;
20+
import io.micronaut.http.HttpRequest;
21+
import io.micronaut.websocket.WebSocketSession;
22+
import jakarta.inject.Singleton;
23+
import java.util.Map;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
26+
@Singleton
27+
public class SessionMetaData {
28+
private final Map<UserId, WebSocketSession> userSessions = new ConcurrentHashMap<>();
29+
private final Map<HttpRequest<?>, WebSocketSession> httpRequestWebSocketSessionMap
30+
= new ConcurrentHashMap<>();
31+
32+
public Map<UserId, WebSocketSession> getUserSessions() {
33+
return userSessions;
34+
}
35+
36+
public Map<HttpRequest<?>, WebSocketSession> getHttpRequestWebSocketSessionMap() {
37+
return httpRequestWebSocketSessionMap;
38+
}
39+
}

backend/src/main/resources/db/migrations/V3__Create_Application.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ CREATE TABLE applications
2121
id SERIAL PRIMARY KEY,
2222
process_time VARCHAR,
2323
status application_status NOT NULL,
24-
user_id INT NOT NULL REFERENCES users (id),
24+
user_id INT NOT NULL REFERENCES users (id) ON DELETE CASCADE,
2525
template_id INT NOT NULL REFERENCES templates (id)
2626
);
2727

0 commit comments

Comments
 (0)