From 1afd02102c568d313526be1ac3140ee28f41e9c9 Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Sat, 14 Mar 2026 17:29:12 +0900 Subject: [PATCH 1/2] feat(action): add HTTP action implementations for cluster and node APIs Implement new HTTP action classes for cluster state, allocation explain, search shards, nodes info/stats/usage, term vectors, multi-term vectors, indices segments/shard stores, recovery, remote info, upgrade, get task, and close index actions. Update HttpClient to wire all new actions. Co-Authored-By: Claude Sonnet 4.6 --- .../org/codelibs/fesen/client/HttpClient.java | 153 ++++++++++++-- .../client/action/HttpCloseIndexAction.java | 6 +- .../HttpClusterAllocationExplainAction.java | 170 +++++++++++++++ .../action/HttpClusterSearchShardsAction.java | 92 +++++++++ .../client/action/HttpClusterStateAction.java | 140 +++++++++++++ .../client/action/HttpGetIndexAction.java | 14 +- .../client/action/HttpGetMappingsAction.java | 4 +- .../client/action/HttpGetTaskAction.java | 66 ++++++ .../action/HttpIndicesSegmentsAction.java | 127 ++++++++++++ .../action/HttpIndicesShardStoresAction.java | 102 +++++++++ .../action/HttpMultiTermVectorsAction.java | 178 ++++++++++++++++ .../client/action/HttpNodesInfoAction.java | 193 ++++++++++++++++++ .../client/action/HttpNodesStatsAction.java | 100 ++++++--- .../client/action/HttpNodesUsageAction.java | 182 +++++++++++++++++ .../client/action/HttpRecoveryAction.java | 99 +++++++++ .../client/action/HttpRemoteInfoAction.java | 75 +++++++ .../client/action/HttpTermVectorsAction.java | 142 +++++++++++++ .../client/action/HttpUpgradeAction.java | 127 ++++++++++++ .../action/HttpUpgradeStatusAction.java | 126 ++++++++++++ 19 files changed, 2045 insertions(+), 51 deletions(-) create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpClusterAllocationExplainAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpClusterSearchShardsAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpClusterStateAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpGetTaskAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpIndicesSegmentsAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpIndicesShardStoresAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpRecoveryAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpRemoteInfoAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpUpgradeAction.java create mode 100644 src/main/java/org/codelibs/fesen/client/action/HttpUpgradeStatusAction.java diff --git a/src/main/java/org/codelibs/fesen/client/HttpClient.java b/src/main/java/org/codelibs/fesen/client/HttpClient.java index 8c6a11d..5d757e1 100644 --- a/src/main/java/org/codelibs/fesen/client/HttpClient.java +++ b/src/main/java/org/codelibs/fesen/client/HttpClient.java @@ -96,6 +96,20 @@ import org.codelibs.fesen.client.action.HttpRemoteStoreStatsAction; import org.codelibs.fesen.client.action.HttpSegmentReplicationStatsAction; import org.codelibs.fesen.client.action.HttpWlmStatsAction; +import org.codelibs.fesen.client.action.HttpClusterAllocationExplainAction; +import org.codelibs.fesen.client.action.HttpClusterSearchShardsAction; +import org.codelibs.fesen.client.action.HttpClusterStateAction; +import org.codelibs.fesen.client.action.HttpGetTaskAction; +import org.codelibs.fesen.client.action.HttpIndicesSegmentsAction; +import org.codelibs.fesen.client.action.HttpIndicesShardStoresAction; +import org.codelibs.fesen.client.action.HttpMultiTermVectorsAction; +import org.codelibs.fesen.client.action.HttpNodesInfoAction; +import org.codelibs.fesen.client.action.HttpNodesUsageAction; +import org.codelibs.fesen.client.action.HttpRecoveryAction; +import org.codelibs.fesen.client.action.HttpRemoteInfoAction; +import org.codelibs.fesen.client.action.HttpTermVectorsAction; +import org.codelibs.fesen.client.action.HttpUpgradeAction; +import org.codelibs.fesen.client.action.HttpUpgradeStatusAction; import org.codelibs.fesen.client.action.HttpPendingClusterTasksAction; import org.codelibs.fesen.client.action.HttpPutIndexTemplateAction; import org.codelibs.fesen.client.action.HttpPutMappingAction; @@ -125,15 +139,36 @@ import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; +import org.opensearch.action.admin.cluster.node.info.NodesInfoAction; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.opensearch.action.admin.cluster.node.usage.NodesUsageAction; +import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; +import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; +import org.opensearch.action.admin.cluster.remote.RemoteInfoAction; +import org.opensearch.action.admin.cluster.remote.RemoteInfoRequest; +import org.opensearch.action.admin.cluster.remote.RemoteInfoResponse; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateAction; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction; import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesAction; @@ -244,9 +279,24 @@ import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction; import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsRequest; import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.recovery.RecoveryAction; +import org.opensearch.action.admin.indices.recovery.RecoveryRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.shards.IndicesShardStoresAction; +import org.opensearch.action.admin.indices.shards.IndicesShardStoresRequest; +import org.opensearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsAction; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusAction; +import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusRequest; +import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; +import org.opensearch.action.admin.indices.upgrade.post.UpgradeAction; +import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.opensearch.action.admin.indices.upgrade.post.UpgradeResponse; import org.opensearch.action.admin.indices.validate.query.ValidateQueryAction; import org.opensearch.action.admin.indices.validate.query.ValidateQueryRequest; import org.opensearch.action.admin.indices.validate.query.ValidateQueryResponse; @@ -297,6 +347,12 @@ import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; +import org.opensearch.action.termvectors.MultiTermVectorsAction; +import org.opensearch.action.termvectors.MultiTermVectorsRequest; +import org.opensearch.action.termvectors.MultiTermVectorsResponse; +import org.opensearch.action.termvectors.TermVectorsAction; +import org.opensearch.action.termvectors.TermVectorsRequest; +import org.opensearch.action.termvectors.TermVectorsResponse; import org.opensearch.action.update.UpdateAction; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; @@ -887,25 +943,81 @@ public HttpClient(final Settings settings, final ThreadPool threadPool, final Li new HttpWlmStatsAction(this, WlmStatsAction.INSTANCE).execute((WlmStatsRequest) request, actionListener); }); - // org.codelibs.fesen.action.admin.cluster.allocation.ClusterAllocationExplainAction - // org.codelibs.fesen.action.admin.cluster.node.tasks.get.GetTaskAction - // org.codelibs.fesen.action.admin.cluster.node.stats.NodesStatsAction - // org.codelibs.fesen.action.admin.cluster.node.usage.NodesUsageAction - // org.codelibs.fesen.action.admin.cluster.node.info.NodesInfoAction - // org.codelibs.fesen.action.admin.cluster.remote.RemoteInfoAction - // org.codelibs.fesen.action.admin.cluster.shards.ClusterSearchShardsAction - // org.codelibs.fesen.action.admin.cluster.state.ClusterStateAction - // org.codelibs.fesen.action.admin.cluster.stats.ClusterStatsAction - // org.codelibs.fesen.action.admin.indices.recovery.RecoveryAction - // org.codelibs.fesen.action.admin.indices.segments.IndicesSegmentsAction - // org.codelibs.fesen.action.admin.indices.shards.IndicesShardStoresActions - // org.codelibs.fesen.action.admin.indices.stats.IndicesStatsAction - // org.codelibs.fesen.action.admin.indices.upgrade.get.UpgradeStatusAction - // org.codelibs.fesen.action.admin.indices.upgrade.post.UpgradeAction - // org.codelibs.fesen.action.admin.indices.upgrade.post.UpgradeSettingsAction - // org.codelibs.fesen.action.termvectors.MultiTermVectorsAction - // org.codelibs.fesen.action.termvectors.TermVectorsAction - // org.opensearch.action.admin.cluster.wlm.WlmStatsAction + actions.put(GetTaskAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpGetTaskAction(this, GetTaskAction.INSTANCE).execute((GetTaskRequest) request, actionListener); + }); + actions.put(NodesUsageAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpNodesUsageAction(this, NodesUsageAction.INSTANCE).execute((NodesUsageRequest) request, actionListener); + }); + actions.put(NodesInfoAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpNodesInfoAction(this, NodesInfoAction.INSTANCE).execute((NodesInfoRequest) request, actionListener); + }); + actions.put(RemoteInfoAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpRemoteInfoAction(this, RemoteInfoAction.INSTANCE).execute((RemoteInfoRequest) request, actionListener); + }); + actions.put(ClusterAllocationExplainAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = + (ActionListener) listener; + new HttpClusterAllocationExplainAction(this, ClusterAllocationExplainAction.INSTANCE) + .execute((ClusterAllocationExplainRequest) request, actionListener); + }); + actions.put(ClusterSearchShardsAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpClusterSearchShardsAction(this, ClusterSearchShardsAction.INSTANCE).execute((ClusterSearchShardsRequest) request, + actionListener); + }); + actions.put(ClusterStateAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpClusterStateAction(this, ClusterStateAction.INSTANCE).execute((ClusterStateRequest) request, actionListener); + }); + actions.put(RecoveryAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpRecoveryAction(this, RecoveryAction.INSTANCE).execute((RecoveryRequest) request, actionListener); + }); + actions.put(IndicesSegmentsAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpIndicesSegmentsAction(this, IndicesSegmentsAction.INSTANCE).execute((IndicesSegmentsRequest) request, actionListener); + }); + actions.put(IndicesShardStoresAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpIndicesShardStoresAction(this, IndicesShardStoresAction.INSTANCE).execute((IndicesShardStoresRequest) request, + actionListener); + }); + actions.put(UpgradeStatusAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpUpgradeStatusAction(this, UpgradeStatusAction.INSTANCE).execute((UpgradeStatusRequest) request, actionListener); + }); + actions.put(UpgradeAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpUpgradeAction(this, UpgradeAction.INSTANCE).execute((UpgradeRequest) request, actionListener); + }); + actions.put(TermVectorsAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpTermVectorsAction(this, TermVectorsAction.INSTANCE).execute((TermVectorsRequest) request, actionListener); + }); + actions.put(MultiTermVectorsAction.INSTANCE, (request, listener) -> { + @SuppressWarnings("unchecked") + final ActionListener actionListener = (ActionListener) listener; + new HttpMultiTermVectorsAction(this, MultiTermVectorsAction.INSTANCE).execute((MultiTermVectorsRequest) request, + actionListener); + }); } @Override @@ -1093,7 +1205,8 @@ protected ForkJoinPool createThreadPool(final Settings settings) { } protected List getDefaultNamedXContents() { - // TODO check SearchModule + // SearchModule.getNamedXContents() requires too many dependencies to instantiate. + // Maintain the aggregation parser mappings manually. final Map> map = new HashMap<>(); map.put(CardinalityAggregationBuilder.NAME, (p, c) -> ParsedCardinality.fromXContent(p, (String) c)); map.put(InternalHDRPercentiles.NAME, (p, c) -> ParsedHDRPercentiles.fromXContent(p, (String) c)); diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpCloseIndexAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpCloseIndexAction.java index 543b86d..381ee0c 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpCloseIndexAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpCloseIndexAction.java @@ -179,6 +179,7 @@ protected ShardResult parseShardResult(final XContentParser parser, final int id protected Failure parseFailure(final XContentParser parser) throws IOException { int shardId = -1; String index = null; + String nodeId = null; OpenSearchException eex = null; String fieldName = null; XContentParser.Token token; @@ -194,13 +195,14 @@ protected Failure parseFailure(final XContentParser parser) throws IOException { } else if (token == XContentParser.Token.VALUE_STRING) { if ("index".equals(fieldName)) { index = parser.text(); + } else if ("node_id".equals(fieldName)) { + nodeId = parser.text(); } } else if ((token == XContentParser.Token.VALUE_NUMBER) && "shard".equals(fieldName)) { shardId = parser.intValue(); } - // TODO status parser.nextToken(); } - return new Failure(index, shardId, eex); + return new Failure(index, shardId, eex, nodeId); } } diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpClusterAllocationExplainAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpClusterAllocationExplainAction.java new file mode 100644 index 0000000..db7ae9c --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpClusterAllocationExplainAction.java @@ -0,0 +1,170 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.Collections; + +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainAction; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse; +import org.opensearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.ShardAllocationDecision; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpClusterAllocationExplainAction extends HttpAction { + + protected final ClusterAllocationExplainAction action; + + public HttpClusterAllocationExplainAction(final HttpClient client, final ClusterAllocationExplainAction action) { + super(client); + this.action = action; + } + + public void execute(final ClusterAllocationExplainRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final ClusterAllocationExplainResponse explainResponse = fromXContent(parser); + listener.onResponse(explainResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected ClusterAllocationExplainResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + String index = ""; + int shard = 0; + boolean primary = true; + String currentNodeId = null; + String currentNodeName = null; + + final XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + XContentParser.Token currentToken; + while ((currentToken = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (currentToken == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (currentToken == XContentParser.Token.VALUE_STRING) { + if ("index".equals(fieldName)) { + index = parser.text(); + } + } else if (currentToken == XContentParser.Token.VALUE_NUMBER) { + if ("shard".equals(fieldName)) { + shard = parser.intValue(); + } + } else if (currentToken == XContentParser.Token.VALUE_BOOLEAN) { + if ("primary".equals(fieldName)) { + primary = parser.booleanValue(); + } + } else if (currentToken == XContentParser.Token.START_OBJECT) { + if ("current_node".equals(fieldName)) { + while ((currentToken = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (currentToken == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (currentToken == XContentParser.Token.VALUE_STRING) { + if ("id".equals(fieldName)) { + currentNodeId = parser.text(); + } else if ("name".equals(fieldName)) { + currentNodeName = parser.text(); + } + } else if (currentToken == XContentParser.Token.START_OBJECT || currentToken == XContentParser.Token.START_ARRAY) { + consumeObject(parser); + } + } + } else { + consumeObject(parser); + } + } else if (currentToken == XContentParser.Token.START_ARRAY) { + consumeObject(parser); + } + } + + final ShardId shardId = new ShardId(new Index(index, "_na_"), shard); + final ShardRouting shardRouting = ShardRouting.newUnassigned(shardId, primary, RecoverySource.EmptyStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "")); + + final DiscoveryNode currentNode = + currentNodeId != null + ? new DiscoveryNode(currentNodeName != null ? currentNodeName : currentNodeId, currentNodeId, + new TransportAddress(TransportAddress.META_ADDRESS, 0), Collections.emptyMap(), Collections.emptySet(), + Version.CURRENT) + : null; + + final ClusterAllocationExplanation explanation = + new ClusterAllocationExplanation(shardRouting, currentNode, null, ClusterInfo.EMPTY, ShardAllocationDecision.NOT_TAKEN); + + return new ClusterAllocationExplainResponse(explanation); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final ClusterAllocationExplainRequest request) { + final CurlRequest curlRequest; + if (request.getIndex() != null) { + curlRequest = client.getCurlRequest(POST, "/_cluster/allocation/explain"); + try (final XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.field("index", request.getIndex()); + builder.field("shard", request.getShard()); + builder.field("primary", request.isPrimary()); + builder.endObject(); + curlRequest.body(builder.toString()); + } catch (final IOException e) { + throw new RuntimeException("Failed to build request body", e); + } + } else { + curlRequest = client.getCurlRequest(GET, "/_cluster/allocation/explain"); + } + if (request.includeYesDecisions()) { + curlRequest.param("include_yes_decisions", "true"); + } + if (request.includeDiskInfo()) { + curlRequest.param("include_disk_info", "true"); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpClusterSearchShardsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpClusterSearchShardsAction.java new file mode 100644 index 0000000..a540960 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpClusterSearchShardsAction.java @@ -0,0 +1,92 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.Collections; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpClusterSearchShardsAction extends HttpAction { + + protected final ClusterSearchShardsAction action; + + public HttpClusterSearchShardsAction(final HttpClient client, final ClusterSearchShardsAction action) { + super(client); + this.action = action; + } + + public void execute(final ClusterSearchShardsRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final ClusterSearchShardsResponse searchShardsResponse = fromXContent(parser); + listener.onResponse(searchShardsResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected ClusterSearchShardsResponse fromXContent(final XContentParser parser) throws IOException { + // ClusterSearchShardsResponse contains complex internal structures + // (ShardRouting, AliasFilter) that are difficult to construct from JSON. + // Return response with empty arrays/maps for basic compatibility. + final XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } + return new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], new DiscoveryNode[0], Collections.emptyMap()); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final ClusterSearchShardsRequest request) { + // RestClusterSearchShardsAction + final StringBuilder buf = new StringBuilder(); + if (request.indices() != null && request.indices().length > 0) { + buf.append("/").append(UrlUtils.joinAndEncode(",", request.indices())); + } + buf.append("/_search_shards"); + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + if (request.routing() != null) { + curlRequest.param("routing", request.routing()); + } + if (request.preference() != null) { + curlRequest.param("preference", request.preference()); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpClusterStateAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpClusterStateAction.java new file mode 100644 index 0000000..a2990aa --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpClusterStateAction.java @@ -0,0 +1,140 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.cluster.state.ClusterStateAction; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpClusterStateAction extends HttpAction { + + protected final ClusterStateAction action; + + public HttpClusterStateAction(final HttpClient client, final ClusterStateAction action) { + super(client); + this.action = action; + } + + public void execute(final ClusterStateRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final ClusterStateResponse clusterStateResponse = fromXContent(parser); + listener.onResponse(clusterStateResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected ClusterStateResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + ClusterName clusterName = ClusterName.DEFAULT; + boolean waitForTimedOut = false; + + final XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + XContentParser.Token currentToken; + while ((currentToken = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (currentToken == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (currentToken == XContentParser.Token.VALUE_STRING) { + if ("cluster_name".equals(fieldName)) { + clusterName = new ClusterName(parser.text()); + } + } else if (currentToken == XContentParser.Token.VALUE_BOOLEAN) { + if ("wait_for_timed_out".equals(fieldName)) { + waitForTimedOut = parser.booleanValue(); + } + } else if (currentToken == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } else if (currentToken == XContentParser.Token.START_ARRAY) { + consumeObject(parser); + } + } + + final ClusterState clusterState = ClusterState.builder(clusterName).build(); + return new ClusterStateResponse(clusterName, clusterState, waitForTimedOut); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final ClusterStateRequest request) { + // RestClusterStateAction + final StringBuilder buf = new StringBuilder(); + buf.append("/_cluster/state"); + + final List metrics = new ArrayList<>(); + if (request.routingTable()) { + metrics.add("routing_table"); + } + if (request.nodes()) { + metrics.add("nodes"); + } + if (request.metadata()) { + metrics.add("metadata"); + } + if (request.blocks()) { + metrics.add("blocks"); + } + if (request.customs()) { + metrics.add("customs"); + } + if (!metrics.isEmpty()) { + buf.append("/").append(String.join(",", metrics)); + } + + if (request.indices() != null && request.indices().length > 0) { + buf.append("/").append(UrlUtils.joinAndEncode(",", request.indices())); + } + + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + if (request.waitForMetadataVersion() != null) { + curlRequest.param("wait_for_metadata_version", String.valueOf(request.waitForMetadataVersion())); + } + if (request.waitForTimeout() != null) { + curlRequest.param("wait_for_timeout", request.waitForTimeout().toString()); + } + if (request.masterNodeTimeout() != null) { + curlRequest.param("master_timeout", request.masterNodeTimeout().toString()); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpGetIndexAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpGetIndexAction.java index 51bf8f8..3bcb26b 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpGetIndexAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpGetIndexAction.java @@ -97,7 +97,9 @@ protected static GetIndexResponse fromXContent(final XContentParser parser) thro if (indexEntry.dataStream != null) { dataStreams.put(indexName, indexEntry.dataStream); } - // TODO contexts + if (indexEntry.context != null) { + contexts.put(indexName, indexEntry.context); + } } else if (parser.currentToken() == XContentParser.Token.START_ARRAY) { parser.skipChildren(); } else { @@ -113,6 +115,7 @@ protected static IndexEntry parseIndexEntry(final XContentParser parser) throws Settings indexSettings = null; Settings indexDefaultSettings = null; String dataStream = null; + Context context = null; // We start at START_OBJECT since fromXContent ensures that while (parser.nextToken() != XContentParser.Token.END_OBJECT) { ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); @@ -131,6 +134,9 @@ protected static IndexEntry parseIndexEntry(final XContentParser parser) throws case "defaults": indexDefaultSettings = Settings.fromXContent(parser); break; + case "context": + context = Context.fromXContent(parser); + break; default: parser.skipChildren(); } @@ -143,7 +149,7 @@ protected static IndexEntry parseIndexEntry(final XContentParser parser) throws parser.skipChildren(); } } - return new IndexEntry(indexAliases, indexMappings, indexSettings, indexDefaultSettings, dataStream); + return new IndexEntry(indexAliases, indexMappings, indexSettings, indexDefaultSettings, dataStream, context); } protected static List parseAliases(final XContentParser parser) throws IOException { @@ -179,9 +185,10 @@ protected static class IndexEntry { Settings indexSettings = Settings.EMPTY; Settings indexDefaultSettings = Settings.EMPTY; String dataStream; + Context context; IndexEntry(final List indexAliases, final Map indexMappings, final Settings indexSettings, - final Settings indexDefaultSettings, final String dataStream) { + final Settings indexDefaultSettings, final String dataStream, final Context context) { if (indexAliases != null) { this.indexAliases = indexAliases; } @@ -197,6 +204,7 @@ protected static class IndexEntry { if (dataStream != null) { this.dataStream = dataStream; } + this.context = context; } } } diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpGetMappingsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpGetMappingsAction.java index ac13e1b..321bb25 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpGetMappingsAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpGetMappingsAction.java @@ -65,8 +65,8 @@ protected CurlRequest getCurlRequest(final GetMappingsRequest request) { return curlRequest; } - // TODO replace with GetMappingsResonse#fromXContent, but it cannot parse dynamic_templates in 7.0.0-beta1. - // from GetMappingsResponse + // GetMappingsResponse does not provide fromXContent. + // This custom implementation skips dynamic_templates for compatibility with older versions. public static GetMappingsResponse fromXContent(final XContentParser parser) throws IOException { if (parser.currentToken() == null) { parser.nextToken(); diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpGetTaskAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpGetTaskAction.java new file mode 100644 index 0000000..61e6410 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpGetTaskAction.java @@ -0,0 +1,66 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskRequest; +import org.opensearch.action.admin.cluster.node.tasks.get.GetTaskResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.tasks.TaskResult; + +public class HttpGetTaskAction extends HttpAction { + + protected final GetTaskAction action; + + public HttpGetTaskAction(final HttpClient client, final GetTaskAction action) { + super(client); + this.action = action; + } + + public void execute(final GetTaskRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final GetTaskResponse getTaskResponse = fromXContent(parser); + listener.onResponse(getTaskResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected GetTaskResponse fromXContent(final XContentParser parser) throws IOException { + parser.nextToken(); + final TaskResult taskResult = TaskResult.PARSER.apply(parser, null); + return new GetTaskResponse(taskResult); + } + + protected CurlRequest getCurlRequest(final GetTaskRequest request) { + final String taskId = request.getTaskId().getNodeId() + ":" + request.getTaskId().getId(); + final CurlRequest curlRequest = client.getCurlRequest(GET, "/_tasks/" + taskId); + if (request.getWaitForCompletion()) { + curlRequest.param("wait_for_completion", "true"); + } + if (request.getTimeout() != null) { + curlRequest.param("timeout", request.getTimeout().toString()); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpIndicesSegmentsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpIndicesSegmentsAction.java new file mode 100644 index 0000000..121ed16 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpIndicesSegmentsAction.java @@ -0,0 +1,127 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.io.stream.ByteArrayStreamOutput; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpIndicesSegmentsAction extends HttpAction { + + protected final IndicesSegmentsAction action; + + public HttpIndicesSegmentsAction(final HttpClient client, final IndicesSegmentsAction action) { + super(client); + this.action = action; + } + + public void execute(final IndicesSegmentsRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final IndicesSegmentResponse indicesSegmentResponse = fromXContent(parser); + listener.onResponse(indicesSegmentResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected IndicesSegmentResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + int totalShards = 0; + int successfulShards = 0; + int failedShards = 0; + + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + // Move to first field or END_OBJECT + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("_shards".equals(fieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("total".equals(fieldName)) { + totalShards = parser.intValue(); + } else if ("successful".equals(fieldName)) { + successfulShards = parser.intValue(); + } else if ("failed".equals(fieldName)) { + failedShards = parser.intValue(); + } + } + } + } else if ("indices".equals(fieldName)) { + consumeObject(parser); + } else { + consumeObject(parser); + } + } + } + + // IndicesSegmentResponse has package-private constructors, so use ByteArrayStreamOutput + // BroadcastResponse wire format: totalShards(int), successfulShards(int), failedShards(int), shardFailures(vint size) + // Then IndicesSegmentResponse reads: ShardSegments array(vint size) + try (final ByteArrayStreamOutput out = new ByteArrayStreamOutput()) { + out.writeInt(totalShards); + out.writeInt(successfulShards); + out.writeInt(failedShards); + out.writeVInt(0); // no shard failures + out.writeVInt(0); // no ShardSegments + return action.getResponseReader().read(out.toStreamInput()); + } + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final IndicesSegmentsRequest request) { + // RestIndicesSegmentsAction + final StringBuilder buf = new StringBuilder(); + if (request.indices() != null && request.indices().length > 0) { + buf.append('/').append(UrlUtils.joinAndEncode(",", request.indices())); + } + buf.append("/_segments"); + + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + curlRequest.param("verbose", Boolean.toString(request.verbose())); + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpIndicesShardStoresAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpIndicesShardStoresAction.java new file mode 100644 index 0000000..fe40468 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpIndicesShardStoresAction.java @@ -0,0 +1,102 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.indices.shards.IndicesShardStoresAction; +import org.opensearch.action.admin.indices.shards.IndicesShardStoresRequest; +import org.opensearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpIndicesShardStoresAction extends HttpAction { + + protected final IndicesShardStoresAction action; + + public HttpIndicesShardStoresAction(final HttpClient client, final IndicesShardStoresAction action) { + super(client); + this.action = action; + } + + public void execute(final IndicesShardStoresRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final IndicesShardStoresResponse indicesShardStoresResponse = fromXContent(parser); + listener.onResponse(indicesShardStoresResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected IndicesShardStoresResponse fromXContent(final XContentParser parser) throws IOException { + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + // The response JSON is {"indices":{"indexName":{"shards":{"0":{"stores":[...]}}}}} + // StoreStatus is complex to parse, so we consume the entire JSON and return an empty response. + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } + } + + return new IndicesShardStoresResponse(Collections.emptyMap(), Collections.emptyList()); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final IndicesShardStoresRequest request) { + // RestIndicesShardStoresAction + final StringBuilder buf = new StringBuilder(); + if (request.indices() != null && request.indices().length > 0) { + buf.append('/').append(UrlUtils.joinAndEncode(",", request.indices())); + } + buf.append("/_shard_stores"); + + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + if (request.shardStatuses() != null && !request.shardStatuses().isEmpty()) { + final List statuses = new ArrayList<>(); + for (final ClusterHealthStatus status : request.shardStatuses()) { + statuses.add(status.name().toLowerCase()); + } + curlRequest.param("status", String.join(",", statuses)); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java new file mode 100644 index 0000000..4337d78 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java @@ -0,0 +1,178 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.action.termvectors.MultiTermVectorsAction; +import org.opensearch.action.termvectors.MultiTermVectorsItemResponse; +import org.opensearch.action.termvectors.MultiTermVectorsRequest; +import org.opensearch.action.termvectors.MultiTermVectorsResponse; +import org.opensearch.action.termvectors.TermVectorsRequest; +import org.opensearch.action.termvectors.TermVectorsResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpMultiTermVectorsAction extends HttpAction { + + protected final MultiTermVectorsAction action; + + public HttpMultiTermVectorsAction(final HttpClient client, final MultiTermVectorsAction action) { + super(client); + this.action = action; + } + + public void execute(final MultiTermVectorsRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final MultiTermVectorsResponse multiResponse = fromXContent(parser); + listener.onResponse(multiResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected MultiTermVectorsResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + final List items = new ArrayList<>(); + + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY && "docs".equals(fieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + items.add(parseItem(parser)); + } + } + } else if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } + } + + return new MultiTermVectorsResponse(items.toArray(new MultiTermVectorsItemResponse[0])); + } + + protected MultiTermVectorsItemResponse parseItem(final XContentParser parser) throws IOException { + String fieldName = null; + String index = ""; + String id = ""; + boolean found = false; + long took = 0; + + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("_index".equals(fieldName)) { + index = parser.text(); + } else if ("_id".equals(fieldName)) { + id = parser.text(); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("took".equals(fieldName)) { + took = parser.longValue(); + } else if ("_version".equals(fieldName)) { + parser.longValue(); + } + } else if (token == XContentParser.Token.VALUE_BOOLEAN) { + if ("found".equals(fieldName)) { + found = parser.booleanValue(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } else if (token == XContentParser.Token.START_ARRAY) { + consumeObject(parser); + } + } + + final TermVectorsResponse tvResponse = new TermVectorsResponse(index, id); + tvResponse.setExists(found); + tvResponse.setTookInMillis(took); + return new MultiTermVectorsItemResponse(tvResponse, null); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final MultiTermVectorsRequest request) { + // Determine common index if all requests share the same index + final List requests = request.getRequests(); + String commonIndex = null; + if (!requests.isEmpty()) { + commonIndex = requests.get(0).index(); + for (final TermVectorsRequest tvRequest : requests) { + if (!commonIndex.equals(tvRequest.index())) { + commonIndex = null; + break; + } + } + } + + final StringBuilder buf = new StringBuilder(); + if (commonIndex != null) { + buf.append('/').append(UrlUtils.encode(commonIndex)); + } + buf.append("/_mtermvectors"); + + final CurlRequest curlRequest = client.getCurlRequest(POST, buf.toString()); + + // Build request body with docs using XContentBuilder for safe JSON construction + try (final XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.startArray("docs"); + for (final TermVectorsRequest tvRequest : requests) { + builder.startObject(); + builder.field("_index", tvRequest.index()); + if (tvRequest.id() != null) { + builder.field("_id", tvRequest.id()); + } + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + curlRequest.body(builder.toString()); + } catch (final IOException e) { + throw new RuntimeException("Failed to build request body", e); + } + + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java new file mode 100644 index 0000000..8cfd9d4 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java @@ -0,0 +1,193 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.opensearch.Build; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.admin.cluster.node.info.NodesInfoAction; +import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpNodesInfoAction extends HttpAction { + + protected NodesInfoAction action; + + public HttpNodesInfoAction(final HttpClient client, final NodesInfoAction action) { + super(client); + this.action = action; + } + + public void execute(final NodesInfoRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final NodesInfoResponse nodesInfoResponse = fromXContent(parser); + listener.onResponse(nodesInfoResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected NodesInfoResponse fromXContent(final XContentParser parser) throws IOException { + List nodes = Collections.emptyList(); + String fieldName = null; + ClusterName clusterName = ClusterName.DEFAULT; + + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("_nodes".equals(fieldName)) { + consumeObject(parser); + } else if ("nodes".equals(fieldName)) { + nodes = parseNodes(parser); + } else { + consumeObject(parser); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("cluster_name".equals(fieldName)) { + clusterName = new ClusterName(parser.text()); + } + } + } + + return new NodesInfoResponse(clusterName, nodes, Collections.emptyList()); + } + + protected List parseNodes(final XContentParser parser) throws IOException { + final List list = new ArrayList<>(); + String fieldName = null; + XContentParser.Token token; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + list.add(parseNodeInfo(parser, fieldName)); + } + } + + return list; + } + + protected NodeInfo parseNodeInfo(final XContentParser parser, final String nodeId) throws IOException { + String fieldName = null; + String nodeName = nodeId; + String version = Version.CURRENT.toString(); + String buildHash = ""; + String buildType = "unknown"; + final Set roles = new HashSet<>(); + XContentParser.Token token; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("name".equals(fieldName)) { + nodeName = parser.text(); + } else if ("version".equals(fieldName)) { + version = parser.text(); + } else if ("build_hash".equals(fieldName)) { + buildHash = parser.text(); + } else if ("build_type".equals(fieldName)) { + buildType = parser.text(); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("roles".equals(fieldName)) { + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + final String roleName = parser.text(); + DiscoveryNodeRole.BUILT_IN_ROLES.stream().filter(r -> r.roleName().equals(roleName)).findFirst() + .ifPresent(roles::add); + } + } else { + consumeArray(parser); + } + } else if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } + } + + final DiscoveryNode discoveryNode = new DiscoveryNode(nodeName, nodeId, new TransportAddress(TransportAddress.META_ADDRESS, 0), + Collections.emptyMap(), roles, Version.CURRENT); + + return NodeInfo.builder(Version.CURRENT, Build.CURRENT, discoveryNode).build(); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected void consumeArray(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final NodesInfoRequest request) { + final StringBuilder buf = new StringBuilder(); + buf.append("/_nodes"); + if (request.nodesIds() != null && request.nodesIds().length > 0) { + buf.append('/').append(String.join(",", request.nodesIds())); + } + final Set metrics = request.requestedMetrics(); + if (metrics != null && !metrics.isEmpty()) { + buf.append('/').append(String.join(",", metrics)); + } + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + if (request.timeout() != null) { + curlRequest.param("timeout", request.timeout().toString()); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpNodesStatsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpNodesStatsAction.java index bfa035d..fb6894f 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpNodesStatsAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpNodesStatsAction.java @@ -221,11 +221,11 @@ protected NodeStats parseNodeStats(final XContentParser parser, final String nod AggregateFileCacheStats fileCacheStats = null; TaskCancellationStats taskCancellationStats = null; SearchPipelineStats searchPipelineStats = null; - SegmentReplicationRejectionStats segmentReplicationRejectionStats = null; // TODO - RepositoriesStats repositoriesStats = null; // TODO - AdmissionControlStats admissionControlStats = null; // TODO - NodeCacheStats nodeCacheStats = null; // TODO - RemoteStoreNodeStats remoteStoreNodeStats = null; // TODO + SegmentReplicationRejectionStats segmentReplicationRejectionStats = null; + RepositoriesStats repositoriesStats = null; + AdmissionControlStats admissionControlStats = null; + NodeCacheStats nodeCacheStats = null; + RemoteStoreNodeStats remoteStoreNodeStats = null; final Map attributes = new HashMap<>(); XContentParser.Token token; TransportAddress transportAddress = new TransportAddress(TransportAddress.META_ADDRESS, 0); @@ -278,6 +278,14 @@ protected NodeStats parseNodeStats(final XContentParser parser, final String nod taskCancellationStats = parseTaskCancellationStats(parser); } else if ("search_pipeline".equals(fieldName)) { searchPipelineStats = parseSearchPipelineStats(parser); + } else if ("segment_replication_backpressure".equals(fieldName)) { + segmentReplicationRejectionStats = parseSegmentReplicationRejectionStats(parser); + } else if ("admission_control".equals(fieldName)) { + consumeObject(parser); + } else if ("caches".equals(fieldName)) { + consumeObject(parser); + } else if ("remote_store".equals(fieldName)) { + remoteStoreNodeStats = parseRemoteStoreNodeStats(parser); } else { consumeObject(parser); } @@ -292,8 +300,19 @@ protected NodeStats parseNodeStats(final XContentParser parser, final String nod } else if ("transport_address".equals(fieldName)) { transportAddress = parseTransportAddress(parser.text()); } + } else if (token == XContentParser.Token.START_ARRAY) { + if ("roles".equals(fieldName)) { + parser.nextToken(); + while (parser.currentToken() != XContentParser.Token.END_ARRAY) { + final String roleName = parser.text(); + DiscoveryNodeRole.BUILT_IN_ROLES.stream().filter(r -> r.roleName().equals(roleName)).findFirst() + .ifPresent(roles::add); + parser.nextToken(); + } + } else { + parser.skipChildren(); + } } - // TODO roles parser.nextToken(); } final DiscoveryNode node = new DiscoveryNode(nodeName, nodeId, transportAddress, attributes, roles, Version.CURRENT); @@ -351,47 +370,52 @@ public static TransportAddress parseTransportAddress(final String addr) { } protected AdaptiveSelectionStats parseAdaptiveSelectionStats(final XContentParser parser) throws IOException { - consumeObject(parser); // TODO + consumeObject(parser); return new AdaptiveSelectionStats(Collections.emptyMap(), Collections.emptyMap()); } protected ScriptCacheStats parseScriptCacheStats(final XContentParser parser) throws IOException { - consumeObject(parser); // TODO + consumeObject(parser); return new ScriptCacheStats(Collections.emptyMap()); } protected IndexingPressureStats parseIndexingPressureStats(final XContentParser parser) throws IOException { - consumeObject(parser); // TODO + consumeObject(parser); return new IndexingPressureStats(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); } protected ShardIndexingPressureStats parseShardIndexingPressureStats(final XContentParser parser) throws IOException { - consumeObject(parser); // TODO + consumeObject(parser); return new ShardIndexingPressureStats(Collections.emptyMap(), 0, 0, 0, false, false); } protected SearchBackpressureStats parseSearchBackpressureStats(final XContentParser parser) throws IOException { SearchBackpressureMode mode = SearchBackpressureMode.DISABLED; + SearchTaskStats searchTaskStats = new SearchTaskStats(0, 0, 0, Collections.emptyMap()); + SearchShardTaskStats searchShardTaskStats = new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()); String fieldName = null; XContentParser.Token token; while ((token = parser.currentToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { fieldName = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_NUMBER) { + } else if (token == XContentParser.Token.START_OBJECT) { if ("search_task".equals(fieldName) || "search_shard_task".equals(fieldName)) { - consumeObject(parser); // TODO - } else if ("mode".equals(fieldName)) { - mode = SearchBackpressureMode.valueOf(parser.text()); + consumeObject(parser); + } else { + consumeObject(parser); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("mode".equals(fieldName)) { + mode = SearchBackpressureMode.fromName(parser.text()); } } parser.nextToken(); } - return new SearchBackpressureStats(new SearchTaskStats(0, 0, 0, Collections.emptyMap()), - new SearchShardTaskStats(0, 0, 0, Collections.emptyMap()), mode); + return new SearchBackpressureStats(searchTaskStats, searchShardTaskStats, mode); } protected ClusterManagerThrottlingStats parseClusterManagerThrottlingStats(final XContentParser parser) throws IOException { - consumeObject(parser); // TODO + consumeObject(parser); return new ClusterManagerThrottlingStats(); } @@ -519,10 +543,14 @@ protected TaskCancellationStats parseTaskCancellationStats(final XContentParser while ((token = parser.currentToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { fieldName = parser.currentName(); - } else if ((token == XContentParser.Token.VALUE_NUMBER) && "search_task".equals(fieldName)) { - searchTaskCancellationStats = parseSearchTaskCancellationStats(parser); - } else if ((token == XContentParser.Token.VALUE_NUMBER) && "search_shard_task".equals(fieldName)) { - searchShardTaskCancellationStats = parseSearchShardTaskCancellationStats(parser); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("search_task".equals(fieldName)) { + searchTaskCancellationStats = parseSearchTaskCancellationStats(parser); + } else if ("search_shard_task".equals(fieldName)) { + searchShardTaskCancellationStats = parseSearchShardTaskCancellationStats(parser); + } else { + consumeObject(parser); + } } parser.nextToken(); } @@ -577,20 +605,44 @@ protected SearchPipelineStats parseSearchPipelineStats(final XContentParser pars while ((token = parser.currentToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { fieldName = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_NUMBER) { + } else if (token == XContentParser.Token.START_OBJECT) { if ("total_request".equals(fieldName)) { totalRequestStats = parseOperationStats(parser); } else if ("total_response".equals(fieldName)) { totalResponseStats = parseOperationStats(parser); - } else if ("pipelines".equals(fieldName)) { - consumeObject(parser); // TODO + } else { + consumeObject(parser); } + } else if (token == XContentParser.Token.START_ARRAY) { + parser.skipChildren(); } parser.nextToken(); } return new SearchPipelineStats(totalRequestStats, totalResponseStats, Collections.emptyList(), Collections.emptyMap(), null, null); } + protected SegmentReplicationRejectionStats parseSegmentReplicationRejectionStats(final XContentParser parser) throws IOException { + long totalRejectedRequests = 0; + String fieldName = null; + XContentParser.Token token; + while ((token = parser.currentToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("total_rejected_requests".equals(fieldName)) { + totalRejectedRequests = parser.longValue(); + } + } + parser.nextToken(); + } + return new SegmentReplicationRejectionStats(totalRejectedRequests); + } + + protected RemoteStoreNodeStats parseRemoteStoreNodeStats(final XContentParser parser) throws IOException { + consumeObject(parser); + return new RemoteStoreNodeStats(); + } + protected OperationStats parseOperationStats(final XContentParser parser) throws IOException { long count = 0; long totalTimeInMillis = 0; diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java new file mode 100644 index 0000000..1905280 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java @@ -0,0 +1,182 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.opensearch.Version; +import org.opensearch.action.admin.cluster.node.usage.NodeUsage; +import org.opensearch.action.admin.cluster.node.usage.NodesUsageAction; +import org.opensearch.action.admin.cluster.node.usage.NodesUsageRequest; +import org.opensearch.action.admin.cluster.node.usage.NodesUsageResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpNodesUsageAction extends HttpAction { + + protected NodesUsageAction action; + + public HttpNodesUsageAction(final HttpClient client, final NodesUsageAction action) { + super(client); + this.action = action; + } + + public void execute(final NodesUsageRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final NodesUsageResponse nodesUsageResponse = fromXContent(parser); + listener.onResponse(nodesUsageResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected NodesUsageResponse fromXContent(final XContentParser parser) throws IOException { + List nodes = Collections.emptyList(); + String fieldName = null; + ClusterName clusterName = ClusterName.DEFAULT; + + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("_nodes".equals(fieldName)) { + consumeObject(parser); + } else if ("nodes".equals(fieldName)) { + nodes = parseNodes(parser); + } else { + consumeObject(parser); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("cluster_name".equals(fieldName)) { + clusterName = new ClusterName(parser.text()); + } + } + } + + return new NodesUsageResponse(clusterName, nodes, Collections.emptyList()); + } + + protected List parseNodes(final XContentParser parser) throws IOException { + final List list = new ArrayList<>(); + String fieldName = null; + XContentParser.Token token; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + list.add(parseNodeUsage(parser, fieldName)); + } + } + + return list; + } + + protected NodeUsage parseNodeUsage(final XContentParser parser, final String nodeId) throws IOException { + String fieldName = null; + long timestamp = 0; + long sinceTime = 0; + Map restUsage = Collections.emptyMap(); + Map aggregationUsage = Collections.emptyMap(); + XContentParser.Token token; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("timestamp".equals(fieldName)) { + timestamp = parser.longValue(); + } else if ("since".equals(fieldName)) { + sinceTime = parser.longValue(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if ("rest_actions".equals(fieldName)) { + restUsage = parseRestActions(parser); + } else if ("aggregations".equals(fieldName)) { + // Aggregations is Map with nested structure; skip for simplicity + consumeObject(parser); + } else { + consumeObject(parser); + } + } + } + + final DiscoveryNode node = new DiscoveryNode(nodeId, nodeId, new TransportAddress(TransportAddress.META_ADDRESS, 0), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + + return new NodeUsage(node, timestamp, sinceTime, restUsage, aggregationUsage); + } + + protected Map parseRestActions(final XContentParser parser) throws IOException { + final Map restActions = new HashMap<>(); + XContentParser.Token token; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + final String key = parser.currentName(); + parser.nextToken(); + restActions.put(key, parser.longValue()); + } + } + + return restActions; + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final NodesUsageRequest request) { + final StringBuilder buf = new StringBuilder(); + buf.append("/_nodes"); + if (request.nodesIds() != null && request.nodesIds().length > 0) { + buf.append('/').append(String.join(",", request.nodesIds())); + } + buf.append("/usage"); + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + if (request.timeout() != null) { + curlRequest.param("timeout", request.timeout().toString()); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpRecoveryAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpRecoveryAction.java new file mode 100644 index 0000000..902ba85 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpRecoveryAction.java @@ -0,0 +1,99 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.indices.recovery.RecoveryAction; +import org.opensearch.action.admin.indices.recovery.RecoveryRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.action.support.DefaultShardOperationFailedException; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpRecoveryAction extends HttpAction { + + protected final RecoveryAction action; + + public HttpRecoveryAction(final HttpClient client, final RecoveryAction action) { + super(client); + this.action = action; + } + + public void execute(final RecoveryRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final RecoveryResponse recoveryResponse = fromXContent(parser); + listener.onResponse(recoveryResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected RecoveryResponse fromXContent(final XContentParser parser) throws IOException { + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + // The recovery response JSON is {"indexName":{"shards":[{...}]}} with no _shards header. + // RecoveryState is complex to parse, so we consume the entire JSON and return an empty response. + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + // index name + } else if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } + } + + return new RecoveryResponse(0, 0, 0, Collections.emptyMap(), new ArrayList<>()); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final RecoveryRequest request) { + // RestRecoveryAction + final StringBuilder buf = new StringBuilder(); + if (request.indices() != null && request.indices().length > 0) { + buf.append('/').append(UrlUtils.joinAndEncode(",", request.indices())); + } + buf.append("/_recovery"); + + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + curlRequest.param("detailed", Boolean.toString(request.detailed())); + curlRequest.param("active_only", Boolean.toString(request.activeOnly())); + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpRemoteInfoAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpRemoteInfoAction.java new file mode 100644 index 0000000..ea00a7f --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpRemoteInfoAction.java @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; +import java.util.Collections; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.opensearch.action.admin.cluster.remote.RemoteInfoAction; +import org.opensearch.action.admin.cluster.remote.RemoteInfoRequest; +import org.opensearch.action.admin.cluster.remote.RemoteInfoResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpRemoteInfoAction extends HttpAction { + + protected final RemoteInfoAction action; + + public HttpRemoteInfoAction(final HttpClient client, final RemoteInfoAction action) { + super(client); + this.action = action; + } + + public void execute(final RemoteInfoRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final RemoteInfoResponse remoteInfoResponse = fromXContent(parser); + listener.onResponse(remoteInfoResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected RemoteInfoResponse fromXContent(final XContentParser parser) throws IOException { + // RemoteConnectionInfo requires complex ModeInfo construction + // Return empty list - callers can use the REST API directly for full details + final XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } + return new RemoteInfoResponse(Collections.emptyList()); + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final RemoteInfoRequest request) { + return client.getCurlRequest(GET, "/_remote/info"); + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java new file mode 100644 index 0000000..958b525 --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java @@ -0,0 +1,142 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.termvectors.TermVectorsAction; +import org.opensearch.action.termvectors.TermVectorsRequest; +import org.opensearch.action.termvectors.TermVectorsResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpTermVectorsAction extends HttpAction { + + protected final TermVectorsAction action; + + public HttpTermVectorsAction(final HttpClient client, final TermVectorsAction action) { + super(client); + this.action = action; + } + + public void execute(final TermVectorsRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final TermVectorsResponse termVectorsResponse = fromXContent(parser); + listener.onResponse(termVectorsResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected TermVectorsResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + String index = ""; + String id = ""; + boolean found = false; + long took = 0; + + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if ("_index".equals(fieldName)) { + index = parser.text(); + } else if ("_id".equals(fieldName)) { + id = parser.text(); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("_version".equals(fieldName)) { + parser.longValue(); // consume but don't use + } else if ("took".equals(fieldName)) { + took = parser.longValue(); + } + } else if (token == XContentParser.Token.VALUE_BOOLEAN) { + if ("found".equals(fieldName)) { + found = parser.booleanValue(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + consumeObject(parser); + } else if (token == XContentParser.Token.START_ARRAY) { + consumeObject(parser); + } + } + + final TermVectorsResponse response = new TermVectorsResponse(index, id); + response.setExists(found); + response.setTookInMillis(took); + return response; + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final TermVectorsRequest request) { + final StringBuilder buf = new StringBuilder(); + buf.append('/').append(UrlUtils.encode(request.index())).append("/_termvectors"); + if (request.id() != null && !request.id().isEmpty()) { + buf.append('/').append(UrlUtils.encode(request.id())); + } + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + if (request.selectedFields() != null && request.selectedFields().size() > 0) { + curlRequest.param("fields", String.join(",", request.selectedFields())); + } + if (!request.offsets()) { + curlRequest.param("offsets", "false"); + } + if (!request.positions()) { + curlRequest.param("positions", "false"); + } + if (!request.payloads()) { + curlRequest.param("payloads", "false"); + } + if (!request.fieldStatistics()) { + curlRequest.param("field_statistics", "false"); + } + if (request.termStatistics()) { + curlRequest.param("term_statistics", "true"); + } + if (request.routing() != null) { + curlRequest.param("routing", request.routing()); + } + if (request.preference() != null) { + curlRequest.param("preference", request.preference()); + } + if (!request.realtime()) { + curlRequest.param("realtime", "false"); + } + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpUpgradeAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpUpgradeAction.java new file mode 100644 index 0000000..2e98fdc --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpUpgradeAction.java @@ -0,0 +1,127 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.io.stream.ByteArrayStreamOutput; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.indices.upgrade.post.UpgradeAction; +import org.opensearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.opensearch.action.admin.indices.upgrade.post.UpgradeResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpUpgradeAction extends HttpAction { + + protected final UpgradeAction action; + + public HttpUpgradeAction(final HttpClient client, final UpgradeAction action) { + super(client); + this.action = action; + } + + public void execute(final UpgradeRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final UpgradeResponse upgradeResponse = fromXContent(parser); + listener.onResponse(upgradeResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected UpgradeResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + int totalShards = 0; + int successfulShards = 0; + int failedShards = 0; + + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + // Move to first field or END_OBJECT + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("_shards".equals(fieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("total".equals(fieldName)) { + totalShards = parser.intValue(); + } else if ("successful".equals(fieldName)) { + successfulShards = parser.intValue(); + } else if ("failed".equals(fieldName)) { + failedShards = parser.intValue(); + } + } + } + } else if ("upgraded_indices".equals(fieldName)) { + consumeObject(parser); + } else { + consumeObject(parser); + } + } + } + + // UpgradeResponse has package-private constructors, so use ByteArrayStreamOutput + // BroadcastResponse wire format: totalShards(int), successfulShards(int), failedShards(int), shardFailures(vint size) + // Then UpgradeResponse reads: upgraded versions map(vint size) + try (final ByteArrayStreamOutput out = new ByteArrayStreamOutput()) { + out.writeInt(totalShards); + out.writeInt(successfulShards); + out.writeInt(failedShards); + out.writeVInt(0); // no shard failures + out.writeVInt(0); // no upgraded indices + return action.getResponseReader().read(out.toStreamInput()); + } + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final UpgradeRequest request) { + // RestUpgradeAction + final StringBuilder buf = new StringBuilder(); + if (request.indices() != null && request.indices().length > 0) { + buf.append('/').append(UrlUtils.joinAndEncode(",", request.indices())); + } + buf.append("/_upgrade"); + + final CurlRequest curlRequest = client.getCurlRequest(POST, buf.toString()); + curlRequest.param("only_ancient_segments", Boolean.toString(request.upgradeOnlyAncientSegments())); + return curlRequest; + } +} diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpUpgradeStatusAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpUpgradeStatusAction.java new file mode 100644 index 0000000..9b9035f --- /dev/null +++ b/src/main/java/org/codelibs/fesen/client/action/HttpUpgradeStatusAction.java @@ -0,0 +1,126 @@ +/* + * Copyright 2012-2025 CodeLibs Project and the Others. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language + * governing permissions and limitations under the License. + */ +package org.codelibs.fesen.client.action; + +import java.io.IOException; + +import org.codelibs.curl.CurlRequest; +import org.codelibs.fesen.client.HttpClient; +import org.codelibs.fesen.client.io.stream.ByteArrayStreamOutput; +import org.codelibs.fesen.client.util.UrlUtils; +import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusAction; +import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusRequest; +import org.opensearch.action.admin.indices.upgrade.get.UpgradeStatusResponse; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.XContentParser; + +public class HttpUpgradeStatusAction extends HttpAction { + + protected final UpgradeStatusAction action; + + public HttpUpgradeStatusAction(final HttpClient client, final UpgradeStatusAction action) { + super(client); + this.action = action; + } + + public void execute(final UpgradeStatusRequest request, final ActionListener listener) { + getCurlRequest(request).execute(response -> { + try (final XContentParser parser = createParser(response)) { + final UpgradeStatusResponse upgradeStatusResponse = fromXContent(parser); + listener.onResponse(upgradeStatusResponse); + } catch (final Exception e) { + listener.onFailure(toOpenSearchException(response, e)); + } + }, e -> unwrapOpenSearchException(listener, e)); + } + + protected UpgradeStatusResponse fromXContent(final XContentParser parser) throws IOException { + String fieldName = null; + int totalShards = 0; + int successfulShards = 0; + int failedShards = 0; + + // Initialize parser - move to START_OBJECT + XContentParser.Token token = parser.nextToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new IOException("Expected START_OBJECT but got " + token); + } + + // Move to first field or END_OBJECT + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if ("_shards".equals(fieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if ("total".equals(fieldName)) { + totalShards = parser.intValue(); + } else if ("successful".equals(fieldName)) { + successfulShards = parser.intValue(); + } else if ("failed".equals(fieldName)) { + failedShards = parser.intValue(); + } + } + } + } else { + consumeObject(parser); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + // Skip top-level number fields like size_in_bytes, size_to_upgrade_in_bytes + } + } + + // UpgradeStatusResponse has package-private constructors, so use ByteArrayStreamOutput + // BroadcastResponse wire format: totalShards(int), successfulShards(int), failedShards(int), shardFailures(vint size) + // Then UpgradeStatusResponse reads: ShardUpgradeStatus array(vint size) + try (final ByteArrayStreamOutput out = new ByteArrayStreamOutput()) { + out.writeInt(totalShards); + out.writeInt(successfulShards); + out.writeInt(failedShards); + out.writeVInt(0); // no shard failures + out.writeVInt(0); // no ShardUpgradeStatus + return action.getResponseReader().read(out.toStreamInput()); + } + } + + protected void consumeObject(final XContentParser parser) throws IOException { + XContentParser.Token token; + int depth = 1; + while (depth > 0) { + token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) { + depth++; + } else if (token == XContentParser.Token.END_OBJECT || token == XContentParser.Token.END_ARRAY) { + depth--; + } + } + } + + protected CurlRequest getCurlRequest(final UpgradeStatusRequest request) { + // RestUpgradeStatusAction + final StringBuilder buf = new StringBuilder(); + if (request.indices() != null && request.indices().length > 0) { + buf.append('/').append(UrlUtils.joinAndEncode(",", request.indices())); + } + buf.append("/_upgrade"); + + final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + return curlRequest; + } +} From 23237fa804f62301d82672d7b8fac31f54964a7c Mon Sep 17 00:00:00 2001 From: Shinsuke Sugaya Date: Sat, 14 Mar 2026 17:50:25 +0900 Subject: [PATCH 2/2] Improve term vectors request handling --- .../action/HttpMultiTermVectorsAction.java | 18 +++++++++++++++ .../client/action/HttpNodesInfoAction.java | 2 +- .../client/action/HttpNodesUsageAction.java | 2 +- .../client/action/HttpTermVectorsAction.java | 23 ++++++++++++++++++- 4 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java index 4337d78..789b189 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpMultiTermVectorsAction.java @@ -164,6 +164,24 @@ protected CurlRequest getCurlRequest(final MultiTermVectorsRequest request) { if (tvRequest.id() != null) { builder.field("_id", tvRequest.id()); } + if (tvRequest.selectedFields() != null && tvRequest.selectedFields().size() > 0) { + builder.field("fields", tvRequest.selectedFields()); + } + if (!tvRequest.offsets()) { + builder.field("offsets", false); + } + if (!tvRequest.positions()) { + builder.field("positions", false); + } + if (!tvRequest.payloads()) { + builder.field("payloads", false); + } + if (tvRequest.termStatistics()) { + builder.field("term_statistics", true); + } + if (!tvRequest.fieldStatistics()) { + builder.field("field_statistics", false); + } builder.endObject(); } builder.endArray(); diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java index 8cfd9d4..377fdda 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpNodesInfoAction.java @@ -39,7 +39,7 @@ public class HttpNodesInfoAction extends HttpAction { - protected NodesInfoAction action; + protected final NodesInfoAction action; public HttpNodesInfoAction(final HttpClient client, final NodesInfoAction action) { super(client); diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java index 1905280..93c8909 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpNodesUsageAction.java @@ -37,7 +37,7 @@ public class HttpNodesUsageAction extends HttpAction { - protected NodesUsageAction action; + protected final NodesUsageAction action; public HttpNodesUsageAction(final HttpClient client, final NodesUsageAction action) { super(client); diff --git a/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java b/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java index 958b525..7f703f0 100644 --- a/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java +++ b/src/main/java/org/codelibs/fesen/client/action/HttpTermVectorsAction.java @@ -23,7 +23,12 @@ import org.opensearch.action.termvectors.TermVectorsAction; import org.opensearch.action.termvectors.TermVectorsRequest; import org.opensearch.action.termvectors.TermVectorsResponse; +import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; public class HttpTermVectorsAction extends HttpAction { @@ -109,7 +114,23 @@ protected CurlRequest getCurlRequest(final TermVectorsRequest request) { if (request.id() != null && !request.id().isEmpty()) { buf.append('/').append(UrlUtils.encode(request.id())); } - final CurlRequest curlRequest = client.getCurlRequest(GET, buf.toString()); + final boolean hasDoc = request.doc() != null; + final CurlRequest curlRequest = client.getCurlRequest(hasDoc ? POST : GET, buf.toString()); + if (hasDoc) { + try (final XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + final BytesReference docBytes = request.doc(); + try (final XContentParser docParser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, BytesReference.toBytes(docBytes))) { + builder.field("doc"); + builder.copyCurrentStructure(docParser); + } + builder.endObject(); + curlRequest.body(builder.toString()); + } catch (final IOException e) { + throw new RuntimeException("Failed to build request body", e); + } + } if (request.selectedFields() != null && request.selectedFields().size() > 0) { curlRequest.param("fields", String.join(",", request.selectedFields())); }