Skip to content

Commit c2c63e3

Browse files
authored
fix: Stash and re-inject thread context before executing monitor in poller (opensearch-project#2132)
Move populateThreadContext inside stashContext().use {} block so that the thread context is properly stashed before populating headers for downstream request interception. Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
1 parent 93fcb58 commit c2c63e3

1 file changed

Lines changed: 17 additions & 16 deletions

File tree

alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ class MonitorJobPoller(
123123
val payload = parseMessage(message.body())
124124
val monitor = payload.toMonitor(xContentRegistry)
125125
val jobStartTime = Instant.parse(payload.jobStartTime)
126+
126127
logger.info(
127128
"Parsed monitor [{}] type [{}] jobStartTime [{}]",
128129
monitor.id, monitor.monitorType, jobStartTime
@@ -143,9 +144,6 @@ class MonitorJobPoller(
143144
}
144145

145146
private suspend fun executeMonitor(monitor: Monitor, jobStartTime: Instant) {
146-
// populate thread context for downstream request interception the moment
147-
// Monitor config is in hand
148-
populateThreadContext(monitor)
149147

150148
val request = ExecuteMonitorRequest(
151149
dryrun = false,
@@ -155,8 +153,11 @@ class MonitorJobPoller(
155153
requestStart = null
156154
)
157155
try {
158-
client.suspendUntil<Client, ExecuteMonitorResponse> {
159-
client.execute(ExecuteMonitorAction.INSTANCE, request, it)
156+
client.threadPool().threadContext.stashContext().use {
157+
populateThreadContext(monitor)
158+
client.suspendUntil<Client, ExecuteMonitorResponse> {
159+
client.execute(ExecuteMonitorAction.INSTANCE, request, it)
160+
}
160161
}
161162
} catch (e: Exception) {
162163
throw AlertingException.wrap(e)
@@ -193,6 +194,17 @@ class MonitorJobPoller(
193194
}
194195
}
195196

197+
companion object {
198+
const val POLLER_THREAD_COUNT = 10
199+
const val POLL_INTERVAL_MS = 1000L
200+
201+
// thread context header keys for request interception
202+
const val IS_BACKGROUND_JOB_HEADER = "is-observability-bg-job"
203+
const val SERVICE_NAME_HEADER = "aws-service-name"
204+
const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url"
205+
const val REGION_HEADER = "aws-region"
206+
}
207+
196208
// populates thread context with KVs that downstream interception will
197209
// need when intercepting search or PPL calls to external customer
198210
// data source
@@ -251,15 +263,4 @@ class MonitorJobPoller(
251263

252264
return targetTypeToServiceName[targetType]!!
253265
}
254-
255-
companion object {
256-
const val POLLER_THREAD_COUNT = 10
257-
const val POLL_INTERVAL_MS = 1000L
258-
259-
// thread context header keys for request interception
260-
const val IS_BACKGROUND_JOB_HEADER = "is-observability-bg-job"
261-
const val SERVICE_NAME_HEADER = "aws-service-name"
262-
const val OPENSEARCH_ENDPOINT_HEADER = "opensearch-url"
263-
const val REGION_HEADER = "aws-region"
264-
}
265266
}

0 commit comments

Comments
 (0)