Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 40 additions & 25 deletions veadk/tools/builtin_tools/llm_shield.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def __init__(self, region: str = "cn-beijing", timeout: int = 50) -> None:
self.appid = getenv("TOOL_LLM_SHIELD_APP_ID")
self.region = region
self.timeout = timeout
self.url = getenv(
"TOOL_LLM_SHIELD_URL",
f"https://{self.region}.sdk.access.llm-shield.omini-shield.com",
)
self.api_key = getenv("TOOL_LLM_SHIELD_API_KEY", allow_false_values=True)

self.category_map = {
101: "Model Misuse",
Expand Down Expand Up @@ -96,18 +101,6 @@ def _request_llm_shield(self, message: str, role: str) -> Optional[str]:
logger.error("LLM Shield app ID not configured")
return None

ak = os.getenv("VOLCENGINE_ACCESS_KEY")
sk = os.getenv("VOLCENGINE_SECRET_KEY")
session_token = ""
if not (ak and sk):
logger.debug("Get AK/SK from environment variables failed.")
credential = get_credential_from_vefaas_iam()
ak = credential.access_key_id
sk = credential.secret_access_key
session_token = credential.session_token
else:
logger.debug("Successfully get AK/SK from environment variables.")

body = {
"Message": {
"Role": role,
Expand All @@ -119,27 +112,50 @@ def _request_llm_shield(self, message: str, role: str) -> Optional[str]:

body_json = json.dumps(body).encode("utf-8")

header = {"X-Security-Token": session_token}
url = f"https://{self.region}.sdk.access.llm-shield.omini-shield.com"
path = "/v2/moderate"
action = "Moderate"
version = "2025-08-31"

signed_header = request_sign(
header, ak, sk, self.region, url, path, action, body_json
)

signed_header.update(
{
# Check if using API key authentication
logger.debug(f"API key value: {self.api_key}, type: {type(self.api_key)}")
if self.api_key and self.api_key != "":
logger.debug("Using API key authentication (no AK/SK signature)")
# Use API key authentication only - match curl command headers exactly
signed_header = {
"Content-Type": "application/json",
"X-Top-Service": "llmshield",
"X-Top-Region": self.region,
"x-api-key": self.api_key,
}
)
else:
logger.debug("Using AK/SK signature authentication")
# Use AK/SK signature authentication
ak = os.getenv("VOLCENGINE_ACCESS_KEY")
sk = os.getenv("VOLCENGINE_SECRET_KEY")
session_token = ""
if not (ak and sk):
logger.debug("Get AK/SK from environment variables failed.")
credential = get_credential_from_vefaas_iam()
ak = credential.access_key_id
sk = credential.secret_access_key
session_token = credential.session_token
else:
logger.debug("Successfully get AK/SK from environment variables.")

header = {"X-Security-Token": session_token}
signed_header = request_sign(
header, ak, sk, self.region, self.url, path, action, body_json
)

signed_header.update(
{
"Content-Type": "application/json",
"X-Top-Service": "llmshield",
"X-Top-Region": self.region,
}
)

try:
response = requests.post(
url + path,
self.url + path,
headers=signed_header,
data=body_json,
params={"Action": action, "Version": version},
Expand All @@ -151,7 +167,6 @@ def _request_llm_shield(self, message: str, role: str) -> Optional[str]:
f"LLM Shield HTTP error: {response.status_code} - {response.text}"
)
return None

response = response.json()
except requests.exceptions.Timeout:
logger.error("LLM Shield request timeout")
Expand Down