diff --git a/apisix/healthcheck_manager.lua b/apisix/healthcheck_manager.lua index f2750ff49b78..47768dab7f1c 100644 --- a/apisix/healthcheck_manager.lua +++ b/apisix/healthcheck_manager.lua @@ -31,7 +31,8 @@ local jp = require("jsonpath") local config_util = require("apisix.core.config_util") local _M = {} -local working_pool = {} -- resource_path -> {version = ver, checker = checker} +-- resource_path -> {version = ver, checker = checker, checks = checks} +local working_pool = {} local waiting_pool = {} -- resource_path -> resource_ver local DELAYED_CLEAR_TIMEOUT = 10 @@ -44,6 +45,33 @@ end _M.get_healthchecker_name = get_healthchecker_name +-- Compute the desired set of health-check targets for an upstream config. +-- Returns an ordered array preserving up_conf.nodes order so that targets are +-- always added to a checker deterministically; each entry also carries a +-- "host:port:hostheader" key so the working set can be diffed cheaply against +-- a checker's current targets. +local function compute_targets(up_conf) + local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host + local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port + local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host + local use_node_hdr = up_conf.pass_host == "node" or nil + + local targets = {} + for _, node in ipairs(up_conf.nodes) do + local host_hdr = up_hdr or (use_node_hdr and node.domain) or nil + local target_port = port or node.port + targets[#targets + 1] = { + host = node.host, + port = target_port, + check_host = host, + host_hdr = host_hdr, + key = node.host .. ":" .. tostring(target_port) .. ":" .. tostring(host_hdr or ""), + } + end + return targets +end + + local function create_checker(up_conf) if not up_conf.checks then return nil @@ -70,19 +98,35 @@ local function create_checker(up_conf) return nil end - -- Add target nodes - local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host - local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port - local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host - local use_node_hdr = up_conf.pass_host == "node" or nil - - for _, node in ipairs(up_conf.nodes) do - local host_hdr = up_hdr or (use_node_hdr and node.domain) - local ok, err = checker:add_target(node.host, port or node.port, host, - true, host_hdr) + -- Add target nodes. Re-adding an already-present target is a no-op except + -- that it clears any pending purge_time, which is what un-marks surviving + -- targets after a delayed_clear() on a checks-config rebuild. + local desired = {} + for _, target in ipairs(compute_targets(up_conf)) do + desired[target.key] = true + local ok, err = checker:add_target(target.host, target.port, target.check_host, + true, target.host_hdr) if not ok then - core.log.error("failed to add healthcheck target: ", node.host, ":", - port or node.port, " err: ", err) + core.log.error("failed to add healthcheck target: ", target.host, ":", + target.port, " err: ", err) + end + end + + -- The shared shm target list may already hold nodes this config no longer + -- has -- e.g. another worker created the checker first and a node was later + -- removed; the worker that never had the checker reaches create_checker(), + -- which otherwise only adds. Remove the stale targets so they stop being + -- probed and reported by /v1/healthcheck (apache/apisix#13282, multi-worker). + local target_list = healthcheck.get_target_list(get_healthchecker_name(up_conf), + healthcheck_shdict_name) or {} + for _, t in ipairs(target_list) do + local key = t.ip .. ":" .. tostring(t.port) .. ":" .. tostring(t.hostheader or "") + if not desired[key] then + local ok, err = checker:remove_target(t.ip, t.port, t.hostname) + if not ok then + core.log.error("failed to remove healthcheck target: ", t.ip, ":", + t.port, " err: ", err) + end end end @@ -90,19 +134,93 @@ local function create_checker(up_conf) end +-- Incrementally reconcile an existing checker's targets to match up_conf. +-- Used when only the upstream nodes changed but the `checks` config did not, +-- so the checker can keep running (and keep its accumulated health state) +-- instead of being destroyed and rebuilt. +-- Returns true only if every add/remove succeeded; on a partial failure the +-- caller must not treat the checker as reconciled for this version. +local function sync_checker_targets(checker, up_conf) + -- index the desired targets by key so they can be diffed against current + local desired = {} + for _, target in ipairs(compute_targets(up_conf)) do + desired[target.key] = target + end + + -- index current targets the same way as desired. Read the authoritative + -- shm target list (the per-worker checker.targets array can lag behind a + -- recent add/remove event). + if not healthcheck then + healthcheck = require("resty.healthcheck") + end + local current = {} + local target_list = healthcheck.get_target_list(get_healthchecker_name(up_conf), + healthcheck_shdict_name) or {} + for _, t in ipairs(target_list) do + -- target_list entries carry hostheader; map it back to our key shape + local key = t.ip .. ":" .. tostring(t.port) .. ":" .. tostring(t.hostheader or "") + current[key] = t + end + + local synced = true + + -- Remove stale targets BEFORE adding new ones. resty.healthcheck identifies a + -- target by ip+port+hostname; the Host header is not part of that identity. A + -- Host-header-only change (e.g. pass_host/upstream_host) therefore produces a + -- removal of the old key and an addition of the new key for the same identity. + -- Removing first frees that identity so the following add_target actually + -- applies the new Host header instead of being a no-op on an existing target. + for key, t in pairs(current) do + if not desired[key] then + local ok, err = checker:remove_target(t.ip, t.port, t.hostname) + if not ok then + synced = false + core.log.error("failed to remove healthcheck target: ", t.ip, ":", + t.port, " err: ", err) + end + end + end + + -- add targets that are desired but not present + for key, target in pairs(desired) do + if not current[key] then + local ok, err = checker:add_target(target.host, target.port, target.check_host, + true, target.host_hdr) + if not ok then + synced = false + core.log.error("failed to add healthcheck target: ", target.host, ":", + target.port, " err: ", err) + end + end + end + + return synced +end + + function _M.fetch_checker(resource_path, resource_ver) local working_item = working_pool[resource_path] if working_item and working_item.version == resource_ver then return working_item.checker end - if waiting_pool[resource_path] == resource_ver then - return nil + -- The requested version differs from the working checker -- e.g. a + -- discovery/DNS change bumped _nodes_ver. Enqueue the new version so + -- timer_create_checker reconciles (or rebuilds) it, but keep returning the + -- existing live checker in the meantime: its accumulated health state is + -- still valid, so requests during the ~1s transition keep filtering + -- unhealthy nodes instead of falling back to "all nodes available", which + -- would let a node already known to be unhealthy receive traffic + -- (apache/apisix#13282). + if waiting_pool[resource_path] ~= resource_ver then + core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) + waiting_pool[resource_path] = resource_ver + end + + if working_item and working_item.checker and not working_item.checker.dead then + return working_item.checker end - -- Add to waiting pool with version - core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) - waiting_pool[resource_path] = resource_ver return nil end @@ -130,10 +248,11 @@ function _M.fetch_node_status(checker, ip, port, hostname) end -local function add_working_pool(resource_path, resource_ver, checker) +local function add_working_pool(resource_path, resource_ver, checker, checks) working_pool[resource_path] = { version = resource_ver, - checker = checker + checker = checker, + checks = checks, } end @@ -209,14 +328,44 @@ local function timer_create_checker() goto continue end - -- if a checker exists then delete it before creating a new one + -- If a checker already exists and the `checks` config is unchanged + -- (only the upstream nodes changed), reconcile its targets in place + -- instead of destroying and rebuilding it. A destroy-and-rebuild + -- leaves `up_checker == nil` for the rebuild window, during which + -- traffic is routed to nodes already known to be unhealthy, and it + -- throws away the checker's accumulated health state. + -- sync_checker_targets is the last condition so it only runs when the + -- checker is reuse-eligible; if it reports a partial failure the whole + -- guard is false and we fall through to a full rebuild below, which + -- converges the upstream to the desired targets instead of committing + -- the new version against a half-reconciled checker. local existing_checker = working_pool[resource_path] + if existing_checker and existing_checker.checker + and not existing_checker.checker.dead + and upstream.checks + and upstream.nodes and #upstream.nodes > 0 + and core.table.deep_eq(existing_checker.checks, upstream.checks) + and sync_checker_targets(existing_checker.checker, upstream) then + add_working_pool(resource_path, resource_ver, existing_checker.checker, + upstream.checks) + core.log.info("reused checker with incremental targets: ", + tostring(existing_checker.checker), " for resource: ", + resource_path, " and version: ", resource_ver) + goto continue + end + + -- The checks config changed (or no checker exists): rebuild the + -- checker. delayed_clear() MUST run before create_checker() re-adds + -- the targets: the new checker shares the same shm target list, and + -- add_target() only un-marks a target's purge_time when it is re-added + -- *after* being marked. Clearing first lets surviving targets get + -- un-marked on re-add, while genuinely dropped targets keep their + -- purge_time and are cleaned up; clearing after create (the reverse) + -- would leave the live checker's targets marked and purge them later. + -- The old checker is only stopped after the new one is published, so + -- fetch_checker never observes a nil/stopped checker (no rebuild gap). if existing_checker then existing_checker.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) - existing_checker.checker:stop() - core.log.info("releasing existing checker: ", tostring(existing_checker.checker), - " for resource: ", resource_path, " and version: ", - existing_checker.version) end local checker = create_checker(upstream) if not checker then @@ -224,7 +373,14 @@ local function timer_create_checker() end core.log.info("create new checker: ", tostring(checker), " for resource: ", resource_path, " and version: ", resource_ver) - add_working_pool(resource_path, resource_ver, checker) + add_working_pool(resource_path, resource_ver, checker, upstream.checks) + if existing_checker then + existing_checker.checker.dead = true + existing_checker.checker:stop() + core.log.info("releasing existing checker: ", tostring(existing_checker.checker), + " for resource: ", resource_path, " and version: ", + existing_checker.version) + end end ::continue:: @@ -243,6 +399,7 @@ local function timer_working_pool_check() --- remove from working pool if resource doesn't exist local res_conf = resource.fetch_latest_conf(resource_path) local need_destroy = true + local has_live_replacement = false if res_conf and res_conf.value then local ok, upstream, err local plugin_name = get_plugin_name(resource_path) @@ -284,6 +441,24 @@ local function timer_working_pool_check() " current version: ", current_ver, " item version: ", item.version) if item.version == current_ver then need_destroy = false + elseif upstream.checks and upstream.nodes and #upstream.nodes > 0 + and core.table.deep_eq(item.checks, upstream.checks) then + -- Version changed but only because of the upstream nodes (and at + -- least one node remains); the `checks` config is identical. Keep + -- the checker alive so timer_create_checker can reconcile its + -- targets incrementally (avoids a destroy-and-rebuild nil window). + -- When the node count drops to 0 we deliberately fall through to + -- destroy the checker, matching the original behaviour. + need_destroy = false + end + + -- Whether a same-name checker will still own the shared shm target + -- list after this worker drops its stale handle: the new config + -- still defines checks and has at least one node, so whichever + -- worker serves traffic (re)builds a checker under the same shm + -- name. This worker must then NOT clear that shm on destroy. + if upstream.checks and upstream.nodes and #upstream.nodes > 0 then + has_live_replacement = true end end end @@ -291,7 +466,15 @@ local function timer_working_pool_check() if need_destroy then working_pool[resource_path] = nil item.checker.dead = true - item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) + -- Only tear down the shared shm target list when no same-name checker + -- will own it (resource deleted, or new config has no checks/nodes). + -- If the config still has checks and nodes, a replacement checker built + -- by whichever worker serves traffic owns the shm; clearing it here + -- would purge that live checker's targets on every worker + -- (apache/apisix#13282, multi-worker). + if not has_live_replacement then + item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) + end item.checker:stop() core.log.info("try to release checker: ", tostring(item.checker), " for resource: ", resource_path, " and version : ", item.version) diff --git a/t/node/healthcheck-discovery.t b/t/node/healthcheck-discovery.t index 9978bba93f2f..57a832bb32e9 100644 --- a/t/node/healthcheck-discovery.t +++ b/t/node/healthcheck-discovery.t @@ -108,7 +108,7 @@ unhealthy TCP increment (1/2) for '127.0.0.1(0.0.0.0:1988)' -=== TEST 2: create new checker when nodes change +=== TEST 2: reuse checker incrementally when nodes change --- apisix_yaml routes: - @@ -150,11 +150,10 @@ routes: } } --- grep_error_log eval -qr/(create new checker|releasing existing checker): table/ +qr/(create new checker|reused checker with incremental targets): table/ --- grep_error_log_out create new checker: table -releasing existing checker: table -create new checker: table +reused checker with incremental targets: table --- timeout: 30 diff --git a/t/node/healthcheck-dns.t b/t/node/healthcheck-dns.t index d0a76dbb636c..cc433e81a162 100644 --- a/t/node/healthcheck-dns.t +++ b/t/node/healthcheck-dns.t @@ -140,6 +140,5 @@ First request status: 200 Second request status: 200 --- error_log create new checker -releasing existing checker -create new checker +reused checker with incremental targets --- timeout: 10 diff --git a/t/node/healthcheck-incremental-update.t b/t/node/healthcheck-incremental-update.t new file mode 100644 index 000000000000..3d2e8739bddb --- /dev/null +++ b/t/node/healthcheck-incremental-update.t @@ -0,0 +1,533 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('warn'); +no_root_location(); +no_shuffle(); + +run_tests(); + +__DATA__ + +=== TEST 1: node-only change reuses the checker (no destroy-and-rebuild) +--- extra_init_worker_by_lua + local healthcheck = require("resty.healthcheck") + local new = healthcheck.new + healthcheck.new = function(...) + ngx.log(ngx.WARN, "create new checker") + local obj = new(...) + local clear = obj.delayed_clear + obj.delayed_clear = function(...) + ngx.log(ngx.WARN, "clear checker") + return clear(...) + end + return obj + end + +--- config +location /t { + content_by_lua_block { + local checks = [[{ + "active":{ + "http_path":"/hello", + "timeout":1, + "type":"http", + "healthy":{ "interval":1, "successes":1 }, + "unhealthy":{ "interval":1, "http_failures":2 } + } + }]] + local function cfg(nodes) + return [[{ + "upstream": { + "nodes": ]] .. nodes .. [[, + "type": "roundrobin", + "checks": ]] .. checks .. [[ + }, + "uri": "/hello" + }]] + end + + local t = require("lib.test_admin").test + -- initial config: one node -> creates the checker + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, + cfg('{"127.0.0.1:1980": 1}')) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + + -- node-only change (checks unchanged): should reconcile in place, + -- NOT create a new checker nor delayed_clear the old one + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, + cfg('{"127.0.0.1:1980": 1, "127.0.0.1:1981": 1}')) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + ngx.say("done") + } +} + +--- request +GET /t +--- response_body +done +--- no_error_log +clear checker +--- error_log +create new checker +--- timeout: 8 + + + +=== TEST 2: checks-config change still rebuilds the checker +--- extra_init_worker_by_lua + local healthcheck = require("resty.healthcheck") + local new = healthcheck.new + healthcheck.new = function(...) + local obj = new(...) + local clear = obj.delayed_clear + obj.delayed_clear = function(...) + ngx.log(ngx.WARN, "clear checker") + return clear(...) + end + return obj + end + +--- config +location /t { + content_by_lua_block { + local function cfg(interval) + return [[{ + "upstream": { + "nodes": {"127.0.0.1:1980": 1}, + "type": "roundrobin", + "checks": { + "active":{ + "http_path":"/hello", + "timeout":1, + "type":"http", + "healthy":{ "interval":]] .. interval .. [[, "successes":1 }, + "unhealthy":{ "interval":1, "http_failures":2 } + } + } + }, + "uri": "/hello" + }]] + end + + local t = require("lib.test_admin").test + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(1)) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + -- change the checks config -> must rebuild (delayed_clear old checker) + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(2)) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + ngx.say("done") + } +} + +--- request +GET /t +--- response_body +done +--- error_log +clear checker +--- timeout: 8 + + + +=== TEST 3: surviving targets are not purged after a checks-config rebuild +# Changing the checks config rebuilds the checker, which delayed_clear()s the old +# one. Because the new checker shares the same shm target list, the surviving +# nodes must keep being health-checked: they must NOT be purged once the +# delayed-clear window elapses. A wrong rebuild order (clear after re-add) would +# leave the live checker's targets marked and purge them here. +--- config +location /t { + content_by_lua_block { + local json = require("apisix.core.json") + local t = require("lib.test_admin").test + local function cfg(interval) + return [[{ + "upstream": { + "nodes": {"127.0.0.1:1980": 1, "127.0.0.1:1981": 1}, + "type": "roundrobin", + "checks": { + "active":{ + "http_path":"/hello", + "type":"http", + "healthy":{ "interval":]] .. interval .. [[, "successes":1 }, + "unhealthy":{ "interval":1, "http_failures":2 } + } + } + }, + "uri": "/hello" + }]] + end + local function count_nodes() + local _, _, res = t('/v1/healthcheck', ngx.HTTP_GET) + local n = 0 + for _, info in ipairs(json.decode(res)) do + n = n + #(info.nodes or {}) + end + return n + end + + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(1)) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + + -- change the checks config (interval 1 -> 2) while keeping both nodes: + -- this rebuilds the checker through the delayed_clear path + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(2)) < 300) + t('/hello', ngx.HTTP_GET) + + -- wait past DELAYED_CLEAR_TIMEOUT (10s) plus a cleanup window + ngx.sleep(15) + + -- both surviving nodes must still be present in the live checker + ngx.say("nodes_after: ", count_nodes()) + } +} +--- request +GET /t +--- response_body +nodes_after: 2 +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 30 + + + +=== TEST 4: a node-only update keeps filtering an already-unhealthy node during the transition +# Before the fetch_checker fix, a node-only version change made fetch_checker() +# return nil until the 1s timer reconciled, so api_ctx.up_checker was nil and the +# balancer fell back to all nodes -- a node already known unhealthy could take +# traffic during the transition (apache/apisix#13282 health-filter bypass window). +--- config +location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local http = require("resty.http") + + local function put(nodes) + return t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "retries": 0, + "nodes": ]] .. nodes .. [[, + "checks": { + "active": { + "type": "tcp", + "healthy": { "interval": 1, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } + } + } + } + }]]) + end + + -- start with only the healthy node so the checker is created without the + -- dead node ever being in the picker yet + assert(put('{"127.0.0.1:1980": 1}') < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(1) + + -- add the dead node (node-only change) and let active checks mark it + -- unhealthy; a request is needed to enqueue the reconcile + assert(put('{"127.0.0.1:1980": 1, "127.0.0.1:1970": 1}') < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(3) + + -- another node-only update opens a fresh version-transition window; + -- immediately burst requests before timer_create_checker reconciles + assert(put('{"127.0.0.1:1980": 1, "127.0.0.1:1970": 1}') < 300) + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local errors = 0 + for _ = 1, 20 do + local httpc = http.new() + local res = httpc:request_uri(uri, { method = "GET", keepalive = false }) + if not res or res.status ~= 200 then + errors = errors + 1 + end + end + -- the already-unhealthy dead node must stay filtered throughout + ngx.say("errors: ", errors) + } +} +--- request +GET /t +--- response_body +errors: 0 +--- error_log +unhealthy TCP increment +--- timeout: 15 + + + +=== TEST 5: create_checker removes targets left stale in the shm by another worker +# Multi-worker: a peer worker created the checker with a node that was later +# removed, leaving it in the shared shm. A worker that never had the checker +# reaches create_checker(), which must reconcile the shm (not just add) so the +# stale node stops being probed and reported by /v1/healthcheck +# (apache/apisix#13282, multi-worker). +--- config +location /t { + content_by_lua_block { + local json = require("apisix.core.json") + local t = require("lib.test_admin").test + + -- simulate a peer worker: seed route 1's checker shm target list with a + -- node (1970) that the config below will not contain + local healthcheck = require("resty.healthcheck") + local seed = healthcheck.new({ + name = "upstream#/apisix/routes/1", + shm_name = "upstream-healthcheck", + events_module = "resty.events", + checks = { active = { type = "tcp", + healthy = { interval = 100, successes = 1 }, + unhealthy = { interval = 100, tcp_failures = 1 } } }, + }) + seed:add_target("127.0.0.1", 1970, nil, true) + + -- this worker has no checker in its working pool, so the first request + -- goes through create_checker() for the current config {1980} + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": {"127.0.0.1:1980": 1}, + "checks": { "active": { "type": "tcp", + "healthy": { "interval": 1, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } } } + } + }]]) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + + -- create_checker's reconcile must have removed the stale 1970 + local _, _, res = t('/v1/healthcheck', ngx.HTTP_GET) + local has_1970 = false + for _, info in ipairs(json.decode(res)) do + for _, node in ipairs(info.nodes or {}) do + if node.port == 1970 then has_1970 = true end + end + end + ngx.say("stale_1970: ", tostring(has_1970)) + } +} +--- request +GET /t +--- response_body +stale_1970: false +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 8 + + + +=== TEST 6: destroying a stale local checker does not purge a peer worker's live shm targets +# Multi-worker: after a checks-config change, a worker that did NOT serve traffic +# keeps its old-version checker in working_pool. timer_working_pool_check then +# destroys that stale local handle. Because the checker's shm target list is +# shared by name, a peer worker's live checker (built for the new config) owns it. +# The destroy path must NOT delayed_clear() that shm, or the peer's live targets +# are purged on every worker once the delayed-clear window elapses +# (apache/apisix#13282, multi-worker). +--- config +location /t { + content_by_lua_block { + local healthcheck = require("resty.healthcheck") + local t = require("lib.test_admin").test + + local NAME = "upstream#/apisix/routes/1" + local SHM = "upstream-healthcheck" + + -- peer worker (worker A): a live, running checker for the same resource + -- that owns the shared shm target list and holds node 1980 + local peer = healthcheck.new({ + name = NAME, shm_name = SHM, events_module = "resty.events", + checks = { active = { type = "tcp", + healthy = { interval = 1, successes = 1 }, + unhealthy = { interval = 1, tcp_failures = 1 } } }, + }) + peer:add_target("127.0.0.1", 1980, nil, true) + + local function cfg(interval) + return [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": {"127.0.0.1:1980": 1}, + "checks": { "active": { "type": "tcp", + "healthy": { "interval": ]] .. interval .. [[, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } } } + } + }]] + end + + -- this worker builds its own checker at checks-interval 1 (serves once) + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(1)) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + + -- change the checks config but send NO request to route 1: this worker + -- never rebuilds, so working_pool keeps the old-version checker. + -- timer_working_pool_check sees the checks change and destroys it. + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg(2)) < 300) + + -- wait past DELAYED_CLEAR_TIMEOUT (10s) + a cleanup window + ngx.sleep(15) + + -- the peer's live target must survive in the shared shm + local list = healthcheck.get_target_list(NAME, SHM) or {} + local live_1980 = false + for _, tg in ipairs(list) do + if tg.port == 1980 then live_1980 = true end + end + ngx.say("live_1980: ", tostring(live_1980)) + } +} +--- request +GET /t +--- response_body +live_1980: true +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 30 + + + +=== TEST 7: recreating the same upstream id during the delayed-clear window keeps its targets +# Deleting an upstream schedules a delayed_clear() of its shm target list. If the +# same id is recreated within that window and served, create_checker() re-adds the +# targets, which must un-mark the pending purge_time so the recreated node is NOT +# purged when the window elapses (apache/apisix#13282). +--- config +location /t { + content_by_lua_block { + local healthcheck = require("resty.healthcheck") + local t = require("lib.test_admin").test + local NAME = "upstream#/apisix/routes/1" + local SHM = "upstream-healthcheck" + + local cfg = [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": {"127.0.0.1:1980": 1}, + "checks": { "active": { "type": "tcp", + "healthy": { "interval": 1, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } } } + } + }]] + + -- create + build checker + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + + -- delete: timer_working_pool_check destroys and delayed_clear()s the shm + assert(t('/apisix/admin/routes/1', ngx.HTTP_DELETE) < 300) + ngx.sleep(2) -- let the destroy fire, still within the 10s clear window + + -- recreate the SAME id within the window and serve it -> create_checker re-adds + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg) < 300) + t('/hello', ngx.HTTP_GET) + + -- wait past the original delayed_clear window + ngx.sleep(12) + + local list = healthcheck.get_target_list(NAME, SHM) or {} + local live_1980 = false + for _, tg in ipairs(list) do + if tg.port == 1980 then live_1980 = true end + end + ngx.say("live_1980: ", tostring(live_1980)) + } +} +--- request +GET /t +--- response_body +live_1980: true +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 40 + + + +=== TEST 8: deleting an upstream cleans its shm target list +# Deleting an upstream (with a live checker) must eventually remove its targets +# from the shared shm so a stale node is no longer probed or reported by +# /v1/healthcheck. +--- config +location /t { + content_by_lua_block { + local healthcheck = require("resty.healthcheck") + local t = require("lib.test_admin").test + local NAME = "upstream#/apisix/routes/1" + local SHM = "upstream-healthcheck" + + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": {"127.0.0.1:1980": 1}, + "checks": { "active": { "type": "tcp", + "healthy": { "interval": 1, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } } } + } + }]]) < 300) + t('/hello', ngx.HTTP_GET) + ngx.sleep(2) + + assert(t('/apisix/admin/routes/1', ngx.HTTP_DELETE) < 300) + -- wait past the delayed_clear window plus a cleanup margin + ngx.sleep(15) + + local list = healthcheck.get_target_list(NAME, SHM) or {} + ngx.say("targets_after_delete: ", #list) + } +} +--- request +GET /t +--- response_body +targets_after_delete: 0 +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 30 diff --git a/t/node/healthcheck-leak-bugfix.t b/t/node/healthcheck-leak-bugfix.t index bcab5689d152..8b3f6c7b4053 100644 --- a/t/node/healthcheck-leak-bugfix.t +++ b/t/node/healthcheck-leak-bugfix.t @@ -17,7 +17,8 @@ use t::APISIX 'no_plan'; repeat_each(1); -log_level('warn'); +# the reuse path logs "reused checker with incremental targets" at info level +log_level('info'); no_root_location(); no_shuffle(); @@ -25,7 +26,7 @@ run_tests(); __DATA__ -=== TEST 1: ensure the old check is cleared after configuration updated +=== TEST 1: reuse the checker without clearing it when only the nodes change --- extra_init_worker_by_lua local healthcheck = require("resty.healthcheck") local new = healthcheck.new @@ -103,6 +104,9 @@ location /t { t('/hello', ngx.HTTP_GET) ngx.sleep(2) assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, cfg) < 300) + -- re-route a request so fetch_checker observes the new version and the + -- manager reconciles the existing checker's targets incrementally + t('/hello', ngx.HTTP_GET) ngx.sleep(2) } } @@ -110,5 +114,7 @@ location /t { --- request GET /t --- error_log +reused checker with incremental targets +--- no_error_log clear checker --- timeout: 7 diff --git a/t/node/healthcheck-multiworker-reconcile.t b/t/node/healthcheck-multiworker-reconcile.t new file mode 100644 index 000000000000..4b52d89aa192 --- /dev/null +++ b/t/node/healthcheck-multiworker-reconcile.t @@ -0,0 +1,217 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('info'); +no_root_location(); +no_shuffle(); +workers(2); +worker_connections(256); + +run_tests(); + +__DATA__ + +=== TEST 1: shared shm target list converges to the desired set under config churn (2 workers) +# With real requests round-robining across two workers, each worker reconciles the +# checker through its own request path/timers. The target membership lives in a +# shm shared by checker name, so its final state must equal the desired node set +# after every kind of change -- node add/remove (incremental) and a checks-config +# change (rebuild) -- with no orphan targets and no purge of surviving targets +# (apache/apisix#13282, multi-worker). +# +# NOTE: this is a convergence/non-regression check, not a deterministic +# reproduction of the cross-worker asymmetry -- requests are distributed across +# workers by the OS accept(), so it cannot force the "one worker rebuilds while +# another destroys" interleaving. The authoritative deterministic reproduction +# is TEST 6 in healthcheck-incremental-update.t. +--- config +location /t { + content_by_lua_block { + local healthcheck = require("resty.healthcheck") + local http = require("resty.http") + local t = require("lib.test_admin").test + + local NAME = "upstream#/apisix/routes/1" + local SHM = "upstream-healthcheck" + + local function cfg(nodes, interval) + return [[{ + "uri": "/hello", + "upstream": { + "type": "roundrobin", + "nodes": ]] .. nodes .. [[, + "checks": { "active": { "type": "tcp", + "healthy": { "interval": ]] .. interval .. [[, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } } } + } + }]] + end + + -- drive real traffic so requests spread across both workers + local function drive() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + for _ = 1, 16 do + local httpc = http.new() + httpc:request_uri(uri, { method = "GET", keepalive = false }) + end + end + + -- the desired set is worker-agnostic: read it from the shared shm + local function shm_ports() + local list = healthcheck.get_target_list(NAME, SHM) or {} + local ports = {} + for _, tg in ipairs(list) do + ports[#ports + 1] = tg.port + end + table.sort(ports) + return table.concat(ports, ",") + end + + ngx.sleep(2) -- let both workers settle + + -- phase 1: two nodes -> both must be registered + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, + cfg('{"127.0.0.1:1980": 1, "127.0.0.1:1981": 1}', 1)) < 300) + drive() + ngx.sleep(3) + ngx.say("phase1: ", shm_ports()) + + -- phase 2: node-only change (remove 1981, add 1982) -> incremental reconcile, + -- shm must drop the orphan and add the new node + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, + cfg('{"127.0.0.1:1980": 1, "127.0.0.1:1982": 1}', 1)) < 300) + drive() + ngx.sleep(3) + ngx.say("phase2: ", shm_ports()) + + -- phase 3: checks-config change (interval 1 -> 2), same nodes -> rebuild. + -- wait past DELAYED_CLEAR_TIMEOUT (10s): surviving targets must NOT be purged + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, + cfg('{"127.0.0.1:1980": 1, "127.0.0.1:1982": 1}', 2)) < 300) + drive() + ngx.sleep(14) + ngx.say("phase3: ", shm_ports()) + } +} +--- request +GET /t +--- response_body +phase1: 1980,1981 +phase2: 1980,1982 +phase3: 1980,1982 +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 40 + + + +=== TEST 2: an unhealthy node is detected and filtered consistently across workers +# The basic multi-worker health scenario: a healthy node (1980) and a dead node +# (1970). Active checks must mark 1970 unhealthy in the shared shm, and with +# retries=0 every request across both workers must still succeed -- proving both +# workers filter the unhealthy node (a per-worker miss would send some requests +# to 1970 and fail). This verifies health STATUS propagation, not just membership. +--- config +location /t { + content_by_lua_block { + local http = require("resty.http") + local t = require("lib.test_admin").test + local json = require("apisix.core.json") + + assert(t('/apisix/admin/routes/1', ngx.HTTP_PUT, [[{ + "uri": "/server_port", + "upstream": { + "type": "roundrobin", + "retries": 0, + "nodes": {"127.0.0.1:1980": 1, "127.0.0.1:1970": 1}, + "checks": { "active": { "type": "tcp", + "healthy": { "interval": 1, "successes": 1 }, + "unhealthy": { "interval": 1, "tcp_failures": 1 } } } + } + }]]) < 300) + + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/server_port" + -- a burst returns how many of n requests did NOT get 200 from 1980, i.e. + -- landed on the dead node 1970 (retries=0 -> hard failure) or errored + local function burst(n) + local errors = 0 + for _ = 1, n do + local httpc = http.new() + local r = httpc:request_uri(uri, { method = "GET", keepalive = false }) + if not r or r.status ~= 200 or r.body ~= "1980" then + errors = errors + 1 + end + end + return errors + end + + -- Drive traffic until every worker has built its checker AND converged its + -- per-worker status cache to "1970 unhealthy". The shared shm reports 1970 + -- unhealthy as soon as ANY one worker probes, so the control-API status is + -- necessary but NOT sufficient -- only routing proves BOTH workers filter. + -- Require several consecutive zero-error bursts (bounded); a genuine + -- per-worker filtering miss keeps producing errors and never converges, + -- so the test fails instead of flaking on a fixed sleep. + local clean_streak = 0 + for _ = 1, 25 do + if burst(12) == 0 then + clean_streak = clean_streak + 1 + if clean_streak >= 3 then + break + end + else + clean_streak = 0 + end + ngx.sleep(1) + end + ngx.say("converged: ", tostring(clean_streak >= 3)) + + -- health status from the shared shm (worker-agnostic) via the control API + local function healthy(status) + return status == "healthy" or status == "mostly_healthy" + end + local _, _, res = t('/v1/healthcheck', ngx.HTTP_GET) + local h1970, h1980 + for _, info in ipairs(json.decode(res)) do + for _, node in ipairs(info.nodes or {}) do + if node.port == 1970 then h1970 = healthy(node.status) end + if node.port == 1980 then h1980 = healthy(node.status) end + end + end + ngx.say("1970_healthy: ", tostring(h1970)) + ngx.say("1980_healthy: ", tostring(h1980)) + } +} +--- request +GET /t +--- response_body +converged: true +1970_healthy: false +1980_healthy: true +--- no_error_log +failed to run timer_working_pool_check +failed to run timer_create_checker +failed to create healthcheck +failed to add healthcheck target +failed to remove healthcheck target +--- timeout: 40 diff --git a/t/node/healthcheck-service-discovery.t b/t/node/healthcheck-service-discovery.t index 7b60141531f1..2163f524fe8b 100644 --- a/t/node/healthcheck-service-discovery.t +++ b/t/node/healthcheck-service-discovery.t @@ -149,6 +149,5 @@ routes: } --- error_log create new checker -releasing existing checker -create new checker +reused checker with incremental targets --- timeout: 10 diff --git a/t/node/healthchecker-independent-upstream.t b/t/node/healthchecker-independent-upstream.t index 957aef2c4e52..d6af917073c5 100644 --- a/t/node/healthchecker-independent-upstream.t +++ b/t/node/healthchecker-independent-upstream.t @@ -109,7 +109,3 @@ passed qr/create new checker: table:/ --- grep_error_log_out create new checker: table: -create new checker: table: -create new checker: table: -create new checker: table: -create new checker: table: