Skip to content

Commit 315aef0

Browse files
engechasChase Engelbrecht
andauthored
Preserve tenancy context across stashes and coroutines (opensearch-project#2118)
* Preserve tenancy context across stashes and coroutines Signed-off-by: Chase Engelbrecht <engechas@dev-dsk-engechas-2a-28f138b0.us-west-2.amazon.com> * Fix ktlint Signed-off-by: Chase Engelbrecht <engechas@dev-dsk-engechas-2a-28f138b0.us-west-2.amazon.com> * Fix tests Signed-off-by: Chase Engelbrecht <engechas@dev-dsk-engechas-2a-28f138b0.us-west-2.amazon.com> * Add tests Signed-off-by: Chase Engelbrecht <engechas@dev-dsk-engechas-2a-28f138b0.us-west-2.amazon.com> --------- Signed-off-by: Chase Engelbrecht <engechas@dev-dsk-engechas-2a-28f138b0.us-west-2.amazon.com> Co-authored-by: Chase Engelbrecht <engechas@dev-dsk-engechas-2a-28f138b0.us-west-2.amazon.com>
1 parent cb17c94 commit 315aef0

23 files changed

Lines changed: 397 additions & 74 deletions

alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.opensearch.commons.alerting.model.MonitorMetadata
3434
import org.opensearch.commons.alerting.model.ScheduledJob
3535
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
3636
import org.opensearch.commons.alerting.util.AlertingException
37+
import org.opensearch.commons.utils.currentTenantId
3738
import org.opensearch.core.rest.RestStatus
3839
import org.opensearch.core.xcontent.NamedXContentRegistry
3940
import org.opensearch.core.xcontent.ToXContent
@@ -77,9 +78,6 @@ object MonitorMetadataService :
7778
this.clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.INDEX_TIMEOUT) { indexTimeout = it }
7879
}
7980

80-
private fun getTenantId(): String? =
81-
client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
82-
8381
@Suppress("ComplexMethod", "ReturnCount")
8482
suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata {
8583
try {
@@ -90,7 +88,7 @@ object MonitorMetadataService :
9088
val putRequestBuilder = PutDataObjectRequest.builder()
9189
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
9290
.id(metadata.id)
93-
.tenantId(getTenantId())
91+
.tenantId(currentTenantId())
9492
.dataObject(metadataObj)
9593

9694
if (updating) {
@@ -177,7 +175,7 @@ object MonitorMetadataService :
177175
val getRequest = GetDataObjectRequest.builder()
178176
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
179177
.id(metadataId)
180-
.tenantId(getTenantId())
178+
.tenantId(currentTenantId())
181179
.build()
182180

183181
val response = sdkClient.getDataObjectAsync(getRequest).await()

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.opensearch.action.ActionRequest
1414
import org.opensearch.action.search.SearchResponse
1515
import org.opensearch.action.support.ActionFilters
1616
import org.opensearch.action.support.HandledTransportAction
17+
import org.opensearch.alerting.AlertingPlugin
1718
import org.opensearch.alerting.opensearchapi.suspendUntil
1819
import org.opensearch.alerting.settings.AlertingSettings
1920
import org.opensearch.alerting.util.await
@@ -33,6 +34,7 @@ import org.opensearch.commons.alerting.model.Alert
3334
import org.opensearch.commons.alerting.model.Monitor
3435
import org.opensearch.commons.alerting.util.AlertingException
3536
import org.opensearch.commons.alerting.util.optionalTimeField
37+
import org.opensearch.commons.utils.TenantContext
3638
import org.opensearch.commons.utils.recreateObject
3739
import org.opensearch.core.action.ActionListener
3840
import org.opensearch.core.xcontent.NamedXContentRegistry
@@ -86,8 +88,9 @@ class TransportAcknowledgeAlertAction @Inject constructor(
8688
) {
8789
val request = acknowledgeAlertRequest as? AcknowledgeAlertRequest
8890
?: recreateObject(acknowledgeAlertRequest) { AcknowledgeAlertRequest(it) }
91+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
8992
client.threadPool().threadContext.stashContext().use {
90-
scope.launch {
93+
scope.launch(TenantContext(tenantId)) {
9194
val getMonitorResponse: GetMonitorResponse =
9295
transportGetMonitorAction.client.suspendUntil {
9396
val getMonitorRequest = GetMonitorRequest(

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeChainedAlertAction.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.opensearch.action.support.ActionFilters
2424
import org.opensearch.action.support.HandledTransportAction
2525
import org.opensearch.action.support.WriteRequest
2626
import org.opensearch.action.update.UpdateRequest
27+
import org.opensearch.alerting.AlertingPlugin
2728
import org.opensearch.alerting.opensearchapi.suspendUntil
2829
import org.opensearch.alerting.settings.AlertingSettings
2930
import org.opensearch.alerting.util.ScheduledJobUtils
@@ -45,6 +46,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob
4546
import org.opensearch.commons.alerting.model.Workflow
4647
import org.opensearch.commons.alerting.util.AlertingException
4748
import org.opensearch.commons.alerting.util.optionalTimeField
49+
import org.opensearch.commons.utils.TenantContext
4850
import org.opensearch.commons.utils.recreateObject
4951
import org.opensearch.core.action.ActionListener
5052
import org.opensearch.core.rest.RestStatus
@@ -105,8 +107,9 @@ class TransportAcknowledgeChainedAlertAction @Inject constructor(
105107

106108
val request = AcknowledgeChainedAlertRequest as? AcknowledgeChainedAlertRequest
107109
?: recreateObject(AcknowledgeChainedAlertRequest) { AcknowledgeChainedAlertRequest(it) }
110+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
108111
client.threadPool().threadContext.stashContext().use {
109-
scope.launch {
112+
scope.launch(TenantContext(tenantId)) {
110113
try {
111114
val getResponse = getWorkflow(request.workflowId)
112115
if (getResponse.isExists == false) {

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteAlertingCommentAction.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import org.opensearch.commons.alerting.action.DeleteCommentResponse
2828
import org.opensearch.commons.alerting.model.Comment
2929
import org.opensearch.commons.alerting.util.AlertingException
3030
import org.opensearch.commons.authuser.User
31+
import org.opensearch.commons.utils.TenantContext
32+
import org.opensearch.commons.utils.currentTenantId
3133
import org.opensearch.commons.utils.recreateObject
3234
import org.opensearch.core.action.ActionListener
3335
import org.opensearch.core.rest.RestStatus
@@ -88,7 +90,8 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
8890
if (!validateUserBackendRoles(user, actionListener)) {
8991
return
9092
}
91-
scope.launch {
93+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
94+
scope.launch(TenantContext(tenantId)) {
9295
DeleteCommentHandler(
9396
client,
9497
actionListener,
@@ -129,7 +132,7 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
129132
val deleteRequest = DeleteDataObjectRequest.builder()
130133
.index(sourceIndex)
131134
.id(commentId)
132-
.tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER))
135+
.tenantId(currentTenantId())
133136
.build()
134137

135138
if (canDelete) {
@@ -158,7 +161,7 @@ class TransportDeleteAlertingCommentAction @Inject constructor(
158161
.query(queryBuilder)
159162
val searchRequest = SearchDataObjectRequest.builder()
160163
.indices(ALL_COMMENTS_INDEX_PATTERN)
161-
.tenantId(client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER))
164+
.tenantId(currentTenantId())
162165
.searchSourceBuilder(searchSourceBuilder)
163166
.build()
164167

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.opensearch.commons.alerting.model.Monitor
3232
import org.opensearch.commons.alerting.model.ScheduledJob
3333
import org.opensearch.commons.alerting.util.AlertingException
3434
import org.opensearch.commons.authuser.User
35+
import org.opensearch.commons.utils.TenantContext
36+
import org.opensearch.commons.utils.currentTenantId
3537
import org.opensearch.commons.utils.recreateObject
3638
import org.opensearch.core.action.ActionListener
3739
import org.opensearch.core.rest.RestStatus
@@ -84,7 +86,8 @@ class TransportDeleteMonitorAction @Inject constructor(
8486
if (!validateUserBackendRoles(user, actionListener)) {
8587
return
8688
}
87-
scope.launch {
89+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
90+
scope.launch(TenantContext(tenantId)) {
8891
DeleteMonitorHandler(
8992
client,
9093
actionListener,
@@ -158,7 +161,7 @@ class TransportDeleteMonitorAction @Inject constructor(
158161
}
159162

160163
private suspend fun getMonitor(): Monitor {
161-
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
164+
val tenantId = currentTenantId()
162165
val getRequest = GetDataObjectRequest.builder()
163166
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
164167
.id(monitorId)

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.action.search.SearchResponse
2323
import org.opensearch.action.support.ActionFilters
2424
import org.opensearch.action.support.HandledTransportAction
2525
import org.opensearch.action.support.WriteRequest.RefreshPolicy
26+
import org.opensearch.alerting.AlertingPlugin
2627
import org.opensearch.alerting.core.lock.LockModel
2728
import org.opensearch.alerting.core.lock.LockService
2829
import org.opensearch.alerting.opensearchapi.addFilter
@@ -48,6 +49,7 @@ import org.opensearch.commons.alerting.model.Workflow
4849
import org.opensearch.commons.alerting.model.WorkflowMetadata
4950
import org.opensearch.commons.alerting.util.AlertingException
5051
import org.opensearch.commons.authuser.User
52+
import org.opensearch.commons.utils.TenantContext
5153
import org.opensearch.commons.utils.recreateObject
5254
import org.opensearch.core.action.ActionListener
5355
import org.opensearch.core.rest.RestStatus
@@ -114,7 +116,8 @@ class TransportDeleteWorkflowAction @Inject constructor(
114116
return
115117
}
116118

117-
scope.launch {
119+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
120+
scope.launch(TenantContext(tenantId)) {
118121
DeleteWorkflowHandler(
119122
client,
120123
actionListener,

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.opensearch.action.search.SearchResponse
2626
import org.opensearch.action.support.ActionFilters
2727
import org.opensearch.action.support.HandledTransportAction
2828
import org.opensearch.alerting.AlertService
29+
import org.opensearch.alerting.AlertingPlugin
2930
import org.opensearch.alerting.MonitorRunnerService
3031
import org.opensearch.alerting.TriggerService
3132
import org.opensearch.alerting.action.GetDestinationsAction
@@ -100,6 +101,7 @@ import org.opensearch.commons.alerting.model.userErrorMessage
100101
import org.opensearch.commons.alerting.util.AlertingException
101102
import org.opensearch.commons.alerting.util.string
102103
import org.opensearch.commons.notifications.model.NotificationConfigInfo
104+
import org.opensearch.commons.utils.TenantContext
103105
import org.opensearch.core.action.ActionListener
104106
import org.opensearch.core.common.Strings
105107
import org.opensearch.core.common.bytes.BytesReference
@@ -205,7 +207,8 @@ class TransportDocLevelMonitorFanOutAction
205207
request: DocLevelMonitorFanOutRequest,
206208
listener: ActionListener<DocLevelMonitorFanOutResponse>
207209
) {
208-
scope.launch {
210+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
211+
scope.launch(TenantContext(tenantId)) {
209212
executeMonitor(request, listener)
210213
}
211214
}

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob
3636
import org.opensearch.commons.alerting.util.AlertingException
3737
import org.opensearch.commons.alerting.util.isMonitorOfStandardType
3838
import org.opensearch.commons.authuser.User
39+
import org.opensearch.commons.utils.TenantContext
3940
import org.opensearch.core.action.ActionListener
4041
import org.opensearch.core.rest.RestStatus
4142
import org.opensearch.core.xcontent.NamedXContentRegistry
@@ -73,12 +74,13 @@ class TransportExecuteMonitorAction @Inject constructor(
7374
log.debug("User and roles string from thread context: $userStr")
7475
val user: User? = User.parse(userStr)
7576

77+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
7678
client.threadPool().threadContext.stashContext().use {
7779
val executeMonitor = fun(monitor: Monitor) {
7880
// Launch the coroutine with the clients threadContext. This is needed to preserve authentication information
7981
// stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin.
8082
// runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) {
81-
runner.launch {
83+
runner.launch(TenantContext(tenantId)) {
8284
val (periodStart, periodEnd) = if (execMonitorRequest.requestStart != null) {
8385
Pair(
8486
Instant.ofEpochMilli(execMonitorRequest.requestStart.millis),
@@ -106,7 +108,6 @@ class TransportExecuteMonitorAction @Inject constructor(
106108
}
107109

108110
if (execMonitorRequest.monitorId != null && execMonitorRequest.monitor == null) {
109-
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
110111
val getRequest = GetDataObjectRequest.builder()
111112
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
112113
.id(execMonitorRequest.monitorId)
@@ -187,7 +188,7 @@ class TransportExecuteMonitorAction @Inject constructor(
187188
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
188189
) {
189190
try {
190-
scope.launch {
191+
scope.launch(TenantContext(tenantId)) {
191192
if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) {
192193
docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources)
193194
log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created")

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.opensearch.commons.alerting.model.Monitor
3131
import org.opensearch.commons.alerting.model.ScheduledJob
3232
import org.opensearch.commons.alerting.util.AlertingException
3333
import org.opensearch.commons.authuser.User
34+
import org.opensearch.commons.utils.TenantContext
35+
import org.opensearch.commons.utils.currentTenantId
3436
import org.opensearch.commons.utils.recreateObject
3537
import org.opensearch.core.action.ActionListener
3638
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
@@ -148,11 +150,12 @@ class TransportGetAlertsAction @Inject constructor(
148150
.size(tableProp.size)
149151
.from(tableProp.startIndex)
150152

153+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
151154
client.threadPool().threadContext.stashContext().use {
152-
scope.launch {
155+
scope.launch(TenantContext(tenantId)) {
153156
try {
154157
val alertIndex = resolveAlertsIndexName(getAlertsRequest)
155-
getAlerts(alertIndex, searchSourceBuilder, actionListener, user)
158+
getAlerts(alertIndex, searchSourceBuilder, actionListener, user, tenantId)
156159
} catch (t: Exception) {
157160
log.error("Failed to get alerts", t)
158161
if (t is AlertingException) {
@@ -202,7 +205,7 @@ class TransportGetAlertsAction @Inject constructor(
202205
}
203206

204207
private suspend fun getMonitor(getAlertsRequest: GetAlertsRequest): Monitor? {
205-
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
208+
val tenantId = currentTenantId()
206209
val getRequest = GetDataObjectRequest.builder()
207210
.index(ScheduledJob.SCHEDULED_JOBS_INDEX)
208211
.id(getAlertsRequest.monitorId!!)
@@ -230,28 +233,33 @@ class TransportGetAlertsAction @Inject constructor(
230233
searchSourceBuilder: SearchSourceBuilder,
231234
actionListener: ActionListener<GetAlertsResponse>,
232235
user: User?,
236+
tenantId: String? = null,
233237
) {
234238
// user is null when: 1/ security is disabled. 2/when user is super-admin.
235239
if (user == null) {
236240
// user is null when: 1/ security is disabled. 2/when user is super-admin.
237-
search(alertIndex, searchSourceBuilder, actionListener)
241+
search(alertIndex, searchSourceBuilder, actionListener, tenantId)
238242
} else if (!doFilterForUser(user)) {
239243
// security is enabled and filterby is disabled.
240-
search(alertIndex, searchSourceBuilder, actionListener)
244+
search(alertIndex, searchSourceBuilder, actionListener, tenantId)
241245
} else {
242246
// security is enabled and filterby is enabled.
243247
try {
244248
log.info("Filtering result by: ${user.backendRoles}")
245249
addFilter(user, searchSourceBuilder, "monitor_user.backend_roles.keyword")
246-
search(alertIndex, searchSourceBuilder, actionListener)
250+
search(alertIndex, searchSourceBuilder, actionListener, tenantId)
247251
} catch (ex: IOException) {
248252
actionListener.onFailure(AlertingException.wrap(ex))
249253
}
250254
}
251255
}
252256

253-
fun search(alertIndex: String, searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetAlertsResponse>) {
254-
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
257+
fun search(
258+
alertIndex: String,
259+
searchSourceBuilder: SearchSourceBuilder,
260+
actionListener: ActionListener<GetAlertsResponse>,
261+
tenantId: String? = null,
262+
) {
255263
val sdkSearchRequest = SearchDataObjectRequest.builder()
256264
.indices(alertIndex)
257265
.tenantId(tenantId)

alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -122,36 +122,38 @@ class TransportGetDestinationsAction @Inject constructor(
122122
}
123123
searchSourceBuilder.query(queryBuilder)
124124

125+
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
125126
client.threadPool().threadContext.stashContext().use {
126-
resolve(searchSourceBuilder, actionListener, user)
127+
resolve(searchSourceBuilder, actionListener, user, tenantId)
127128
}
128129
}
129130

130131
fun resolve(
131132
searchSourceBuilder: SearchSourceBuilder,
132133
actionListener: ActionListener<GetDestinationsResponse>,
133-
user: User?
134+
user: User?,
135+
tenantId: String? = null,
134136
) {
135137
if (user == null) {
136-
// user is null when: 1/ security is disabled. 2/when user is super-admin.
137-
search(searchSourceBuilder, actionListener)
138+
search(searchSourceBuilder, actionListener, tenantId)
138139
} else if (!doFilterForUser(user)) {
139-
// security is enabled and filterby is disabled.
140-
search(searchSourceBuilder, actionListener)
140+
search(searchSourceBuilder, actionListener, tenantId)
141141
} else {
142-
// security is enabled and filterby is enabled.
143142
try {
144143
log.info("Filtering result by: ${user.backendRoles}")
145144
addFilter(user, searchSourceBuilder, "destination.user.backend_roles.keyword")
146-
search(searchSourceBuilder, actionListener)
145+
search(searchSourceBuilder, actionListener, tenantId)
147146
} catch (ex: IOException) {
148147
actionListener.onFailure(AlertingException.wrap(ex))
149148
}
150149
}
151150
}
152151

153-
fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener<GetDestinationsResponse>) {
154-
val tenantId = client.threadPool().threadContext.getHeader(AlertingPlugin.TENANT_ID_HEADER)
152+
fun search(
153+
searchSourceBuilder: SearchSourceBuilder,
154+
actionListener: ActionListener<GetDestinationsResponse>,
155+
tenantId: String? = null,
156+
) {
155157
val sdkSearchRequest = SearchDataObjectRequest.builder()
156158
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
157159
.tenantId(tenantId)

0 commit comments

Comments
 (0)