Skip to content

Commit b9cd95c

Browse files
committed
support FinishEventProcessor and some Span Set Method
1 parent 9036218 commit b9cd95c

12 files changed

Lines changed: 261 additions & 64 deletions

File tree

cozeloop/_client.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import os
88
import threading
99
from datetime import datetime
10-
from typing import Dict, Any, List, Optional
10+
from typing import Dict, Any, List, Optional, Callable
11+
12+
import httpx
1113

1214
from cozeloop.client import Client
1315
from cozeloop._noop import NOOP_SPAN, _NoopClient
@@ -17,6 +19,8 @@
1719
from cozeloop.internal.httpclient import Auth
1820
from cozeloop.internal.prompt import PromptProvider
1921
from cozeloop.internal.trace import TraceProvider
22+
from cozeloop.internal.trace.model.model import FinishEventInfo, TagTruncateConf
23+
from cozeloop.internal.trace.trace import default_finish_event_processor
2024
from cozeloop.span import SpanContext, Span
2125

2226
logger = logging.getLogger(__name__)
@@ -54,8 +58,11 @@ def new_client(
5458
prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT,
5559
prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL,
5660
prompt_trace: bool = False,
61+
http_client: Optional[httpx.Client] = None,
62+
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
63+
tag_truncate_conf: Optional[TagTruncateConf] = None,
5764
) -> Client:
58-
cache_key = _generate_cache_key(
65+
cache_key = _generate_cache_key( # all args are used to generate cache key
5966
api_base_url,
6067
workspace_id,
6168
api_token,
@@ -67,7 +74,10 @@ def new_client(
6774
ultra_large_report,
6875
prompt_cache_max_count,
6976
prompt_cache_refresh_interval,
70-
prompt_trace
77+
prompt_trace,
78+
http_client,
79+
trace_finish_event_processor,
80+
tag_truncate_conf,
7181
)
7282

7383
with _cache_lock:
@@ -88,6 +98,8 @@ def new_client(
8898
prompt_cache_max_count=prompt_cache_max_count,
8999
prompt_cache_refresh_interval=prompt_cache_refresh_interval,
90100
prompt_trace=prompt_trace,
101+
arg_http_client=http_client,
102+
trace_finish_event_processor=trace_finish_event_processor,
91103
)
92104
_client_cache[cache_key] = client
93105
return client
@@ -113,7 +125,10 @@ def __init__(
113125
ultra_large_report: bool = False,
114126
prompt_cache_max_count: int = consts.DEFAULT_PROMPT_CACHE_MAX_COUNT,
115127
prompt_cache_refresh_interval: int = consts.DEFAULT_PROMPT_CACHE_REFRESH_INTERVAL,
116-
prompt_trace: bool = False
128+
prompt_trace: bool = False,
129+
arg_http_client: Optional[httpx.Client] = None,
130+
trace_finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None,
131+
tag_truncate_conf: Optional[TagTruncateConf] = None,
117132
):
118133
workspace_id = self._get_from_env(workspace_id, ENV_WORKSPACE_ID)
119134
api_base_url = self._get_from_env(api_base_url, ENV_API_BASE_URL)
@@ -136,6 +151,8 @@ def __init__(
136151

137152
self._workspace_id = workspace_id
138153
inner_client = httpclient.HTTPClient()
154+
if arg_http_client:
155+
inner_client = arg_http_client
139156
auth = self._build_auth(
140157
api_base_url=api_base_url,
141158
http_client=inner_client,
@@ -151,10 +168,18 @@ def __init__(
151168
timeout=timeout,
152169
upload_timeout=upload_timeout
153170
)
171+
finish_pro = default_finish_event_processor
172+
if trace_finish_event_processor:
173+
def combined_processor(event_info: FinishEventInfo):
174+
default_finish_event_processor(event_info)
175+
trace_finish_event_processor(event_info)
176+
finish_pro = combined_processor
154177
self._trace_provider = TraceProvider(
155178
http_client=http_client,
156179
workspace_id=workspace_id,
157-
ultra_large_report=ultra_large_report
180+
ultra_large_report=ultra_large_report,
181+
finish_event_processor=finish_pro,
182+
tag_truncate_conf=tag_truncate_conf,
158183
)
159184
self._prompt_provider = PromptProvider(
160185
workspace_id=workspace_id,
@@ -234,7 +259,7 @@ def start_span(
234259
else:
235260
return self._trace_provider.start_span(name=name, span_type=span_type, start_time=start_time,
236261
parent_span_id=child_of.span_id, trace_id=child_of.trace_id,
237-
baggage=child_of.baggage, start_new_trace=start_new_trace)
262+
baggage=child_of.baggage(), start_new_trace=start_new_trace)
238263
except Exception as e:
239264
logger.warning(f"Start span failed, returning noop span. Error: {e}")
240265
return NOOP_SPAN

cozeloop/internal/httpclient/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ def _set_headers(self, headers: Optional[Dict[str, str]] = None) -> Dict[str, st
4343
res.update(headers)
4444
res[consts.AUTHORIZE_HEADER] = f"Bearer {self.auth.token}"
4545

46-
tt_env = os.getenv("x-tt-env")
46+
tt_env = os.getenv("x_tt_env")
4747
if tt_env:
4848
res["x-tt-env"] = tt_env
49-
ppe_env = os.getenv("x-use-ppe")
49+
ppe_env = os.getenv("x_use_ppe")
5050
if ppe_env:
5151
res["x-use-ppe"] = "1"
5252

cozeloop/internal/trace/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,3 @@
66
from .trace import TraceProvider
77

88
from .trace import Span
9-

cozeloop/internal/trace/exporter.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
logger = logging.getLogger(__name__)
1919

2020
class Exporter:
21-
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
21+
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> (bool, str):
2222
raise NotImplementedError
2323

24-
def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
24+
def export_files(self, ctx: dict, files: List['UploadFile']) -> (bool, str):
2525
raise NotImplementedError
2626

2727

@@ -40,7 +40,7 @@ class SpanExporter(Exporter):
4040
def __init__(self, client: Client):
4141
self.client = client
4242

43-
def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
43+
def export_files(self, ctx: dict, files: List['UploadFile']) -> (bool, str):
4444
for file in files:
4545
if not file:
4646
continue
@@ -55,18 +55,14 @@ def export_files(self, ctx: dict, files: List['UploadFile']) -> bool:
5555
{"workspace_id": file.space_id},
5656
)
5757
if resp.code != 0: # todo: some err code do not need retry
58-
logger.error(f"export files[{file.tos_key}] fail, code:[{resp.code}], msg:[{resp.msg}]")
59-
return False
58+
return False, f"export files[{file.tos_key}] fail, code:[{resp.code}], msg:[{resp.msg}]"
6059
except Exception as e:
61-
logger.error(f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]")
62-
return False
60+
return False, f"export files[{file.tos_key}] fail, err:[{e}], file.name:[{file.name}]"
6361

6462
logger.debug(f"uploadFile end, file name: {file.name}")
65-
return True
66-
67-
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
68-
logger.debug(f"export spans, spans count: {len(spans)}")
63+
return True, ""
6964

65+
def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> (bool, str):
7066
if not spans:
7167
return True
7268

@@ -77,13 +73,11 @@ def export_spans(self, ctx: dict, spans: List['UploadSpan']) -> bool:
7773
UploadSpanData(spans=spans),
7874
)
7975
if resp.code != 0: # todo: some err code do not need retry
80-
logger.error(f"export spans fail, code:[{resp.code}], msg:[{resp.msg}]")
81-
return False
76+
return False, f"export spans fail, code:[{resp.code}], msg:[{resp.msg}]"
8277
except Exception as e:
83-
logger.error(f"export spans fail, err:[{e}]")
84-
return False
78+
return False, f"export spans fail, err:[{e}]"
8579

86-
return True
80+
return True, ""
8781

8882

8983
class UploadSpanData(BaseModel):
@@ -92,10 +86,12 @@ class UploadSpanData(BaseModel):
9286

9387
class UploadSpan(BaseModel):
9488
started_at_micros: int
89+
log_id: str
9590
span_id: str
9691
parent_id: str
9792
trace_id: str
9893
duration_micros: int
94+
service_name: str
9995
workspace_id: str
10096
span_name: str
10197
span_type: str
@@ -137,10 +133,12 @@ def transfer_to_upload_span_and_file(spans: List['Span']) -> (List[UploadSpan],
137133

138134
res_span.append(UploadSpan(
139135
started_at_micros=int(span.start_time.timestamp() * 1_000_000),
136+
log_id=span.log_id,
140137
span_id=span.span_id,
141138
parent_id=span.parent_span_id,
142139
trace_id=span.trace_id,
143140
duration_micros=span.get_duration(),
141+
service_name=span.service_name,
144142
workspace_id=span.get_space_id(),
145143
span_name=span.get_span_name(),
146144
span_type=span.get_span_type(),

cozeloop/internal/trace/model/model.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from enum import Enum
55

66
from pydantic import BaseModel
7-
from typing import List, Optional
7+
from typing import List, Optional, Literal
8+
from pydantic.dataclasses import dataclass
89

910

1011
class ObjectStorage(BaseModel):
@@ -23,3 +24,28 @@ class Attachment(BaseModel):
2324
class UploadType(str, Enum):
2425
LONG = 1
2526
MULTI_MODALITY = 2
27+
28+
29+
SpanFinishEvent = Literal[
30+
"queue_manager.span_entry.rate",
31+
"queue_manager.file_entry.rate",
32+
"exporter.span_flush.rate",
33+
"exporter.file_flush.rate"
34+
]
35+
36+
@dataclass
37+
class FinishEventInfoExtra:
38+
is_root_span: bool
39+
40+
@dataclass
41+
class FinishEventInfo:
42+
event_type: SpanFinishEvent
43+
is_event_fail: bool
44+
item_num: int # maybe multiple span is processed in one event
45+
detail_msg: str
46+
extra_params: Optional[FinishEventInfoExtra] = None
47+
48+
@dataclass
49+
class TagTruncateConf:
50+
normal_field_max_byte: int
51+
input_output_field_max_byte: int

cozeloop/internal/trace/noop_span.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ def set_start_time_first_resp(self, start_time_first_resp: int) -> None:
110110
def set_runtime(self, runtime: Runtime) -> None:
111111
pass
112112

113+
def set_service_name(self, service_name: str) -> None:
114+
pass
115+
116+
def set_log_id(self, log_id: str) -> None:
117+
pass
118+
113119
def __enter__(self):
114120
return self
115121

cozeloop/internal/trace/queue_manager.py

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,20 @@
1212
import logging
1313
import queue
1414
import threading
15-
from typing import Any, Callable, List
15+
from typing import Any, Callable, List, Optional
1616
from queue import Queue
1717

1818
from pydantic import BaseModel
1919

20+
from cozeloop.internal.trace.model.model import FinishEventInfo, FinishEventInfoExtra
21+
2022
logger = logging.getLogger(__name__)
2123

24+
QUEUE_NAME_SPAN = "span"
25+
QUEUE_NAME_SPAN_RETRY = "span_retry"
26+
QUEUE_NAME_FILE = "file"
27+
QUEUE_NAME_FILE_RETRY = "file_retry"
28+
2229
class QueueManager:
2330
def enqueue(self, s: Any, byte_size: int):
2431
raise NotImplementedError
@@ -37,6 +44,7 @@ class BatchQueueManagerOptions(BaseModel):
3744
max_export_batch_length: int
3845
max_export_batch_byte_size: int
3946
export_func: Callable[[dict, List[Any]], None]
47+
finish_event_processor: Optional[Callable[[FinishEventInfo], None]] = None
4048

4149

4250
class BatchQueueManager(QueueManager):
@@ -112,8 +120,6 @@ def _do_export(self):
112120
self._do_export_batch()
113121

114122
def _do_export_batch(self):
115-
logger.debug(
116-
f"{self.options.queue_name} queue _do_export_batch, len: {len(self.batch)}")
117123
with self.batch_lock:
118124
if self.batch:
119125
if self.export_func:
@@ -125,20 +131,40 @@ def enqueue(self, item: Any, byte_size: int):
125131
if self.stop_event.is_set():
126132
return
127133

134+
is_fail = False
135+
detail_msg = ""
128136
try:
129137
self.queue.put_nowait(item)
130138
if self.queue.qsize() >= self.options.max_queue_length:
131139
with self.condition:
132140
self.condition.notify()
133-
logger.debug(f"{self.options.queue_name} enqueue, queue length: {self.queue.qsize()}")
141+
detail_msg = f"{self.options.queue_name} enqueue, queue length: {self.queue.qsize()}"
134142
except queue.Full:
135-
logger.error(
136-
f"{self.options.queue_name} queue is full, dropped span")
143+
is_fail = True
144+
detail_msg = f"{self.options.queue_name} queue is full, dropped span"
137145
self.dropped += 1
138146
else:
139147
with self.batch_lock:
140148
self.batch_byte_size += byte_size
141149

150+
event_typ = "queue_manager.file_entry.rate"
151+
extra_params = FinishEventInfoExtra(is_root_span=False)
152+
if self.options.queue_name == QUEUE_NAME_SPAN or self.options.queue_name == QUEUE_NAME_SPAN_RETRY:
153+
event_typ = "queue_manager.span_entry.rate"
154+
if item.is_root_span():
155+
extra_params = FinishEventInfoExtra(
156+
is_root_span=True,
157+
)
158+
159+
if self.options and self.options.finish_event_processor:
160+
self.options.finish_event_processor(FinishEventInfo(
161+
event_type=event_typ,
162+
is_event_fail=is_fail,
163+
item_num=1,
164+
detail_msg=detail_msg,
165+
extra_params=extra_params
166+
))
167+
142168
def shutdown(self) -> bool:
143169
if self.stop_event.is_set():
144170
return True

0 commit comments

Comments
 (0)