Skip to content

Commit 3651113

Browse files
authored
[DataProcessor]Remove ENABLE_V1_DATA_PROCESSOR (#7052)
* remove ENABLE_V1_DATA_PROCESSOR * fix unit test * fix unit test
1 parent ee2b965 commit 3651113

59 files changed

Lines changed: 88 additions & 16858 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

fastdeploy/engine/async_llm.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ async def add_request(
446446
)
447447
if envs.ZMQ_SEND_BATCH_DATA and self.connection_manager is not None:
448448
request["zmq_worker_pid"] = self.connection_manager.worker_pid
449-
if not envs.ENABLE_V1_DATA_PROCESSOR and self.cfg.model_config.enable_mm:
449+
if self.cfg.model_config.enable_mm:
450450
self.request_client.send_pyobj(request)
451451
else:
452452
self.request_client.send_json(request)
@@ -543,8 +543,7 @@ async def generate(
543543
)
544544
else:
545545
processed_output = response_item
546-
if not envs.ENABLE_V1_DATA_PROCESSOR:
547-
processed_output = RequestOutput.from_dict(processed_output)
546+
processed_output = RequestOutput.from_dict(processed_output)
548547
# Enrich outputs with prompt metadata on the first packet
549548
if req_id:
550549
prompt_meta = self._prompt_metadata.get(req_id)

fastdeploy/engine/common_engine.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ def _insert_zmq_task_to_scheduler(self):
11861186
while self.running:
11871187
try:
11881188
block = True if len(added_requests) == 0 else False
1189-
if not self.cfg.model_config.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
1189+
if not self.cfg.model_config.enable_mm:
11901190
err, data = self.recv_request_server.receive_json_once(block)
11911191
else:
11921192
err, data = self.recv_request_server.receive_pyobj_once(block)
@@ -1243,8 +1243,7 @@ def _insert_zmq_task_to_scheduler(self):
12431243
continue
12441244
err_msg = None
12451245
try:
1246-
if not envs.ENABLE_V1_DATA_PROCESSOR:
1247-
request = Request.from_dict(data)
1246+
request = Request.from_dict(data)
12481247
request.metrics.scheduler_recv_req_time = time.time()
12491248
main_process_metrics.requests_number.inc()
12501249
trace_carrier = data.get("trace_carrier")

fastdeploy/entrypoints/engine_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ async def add_requests(self, task):
437437
def _send_task(self, task):
438438
if envs.ZMQ_SEND_BATCH_DATA:
439439
task["zmq_worker_pid"] = self.worker_pid
440-
if not self.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
440+
if not self.enable_mm:
441441
self.zmq_client.send_json(task)
442442
else:
443443
if envs.FD_ENABLE_E2W_TENSOR_CONVERT:
@@ -599,7 +599,7 @@ async def run_control_method(self, request: ControlRequest):
599599
req_dict = request.to_dict()
600600
if envs.ZMQ_SEND_BATCH_DATA:
601601
req_dict["zmq_worker_pid"] = self.worker_pid
602-
if not self.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
602+
if not self.enable_mm:
603603
self.zmq_client.send_json(req_dict)
604604
else:
605605
self.zmq_client.send_pyobj(req_dict)

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import fastdeploy.envs as envs
2828
import fastdeploy.metrics.trace as tracing
29-
from fastdeploy.engine.request import Request, RequestOutput
29+
from fastdeploy.engine.request import RequestOutput
3030
from fastdeploy.entrypoints.openai.protocol import (
3131
ChatCompletionRequest,
3232
ChatCompletionResponse,
@@ -145,10 +145,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
145145
prompt_tokens = None
146146
max_tokens = None
147147
try:
148-
if not envs.ENABLE_V1_DATA_PROCESSOR:
149-
current_req_dict = request.to_dict_for_infer(f"{request_id}_0")
150-
else:
151-
current_req_dict = Request.from_generic_request(request, request_id=f"{request_id}_0")
148+
current_req_dict = request.to_dict_for_infer(f"{request_id}_0")
152149
if "chat_template" not in current_req_dict:
153150
current_req_dict["chat_template"] = self.chat_template
154151
current_req_dict["metrics"]["arrival_time"] = time.time()

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import fastdeploy.envs as envs
2929
import fastdeploy.metrics.trace as tracing
30-
from fastdeploy.engine.request import Request, RequestOutput
30+
from fastdeploy.engine.request import RequestOutput
3131
from fastdeploy.entrypoints.openai.protocol import (
3232
CompletionLogprobs,
3333
CompletionRequest,
@@ -178,10 +178,7 @@ async def create_completion(self, request: CompletionRequest):
178178
try:
179179
for idx, prompt in enumerate(request_prompts):
180180
request_id_idx = f"{request_id}_{idx}"
181-
if not envs.ENABLE_V1_DATA_PROCESSOR:
182-
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
183-
else:
184-
current_req_dict = Request.from_generic_request(request, request_id=f"{request_id}_0")
181+
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
185182
current_req_dict["metrics"]["arrival_time"] = time.time()
186183
prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) # tokenize
187184
if isinstance(prompt_token_ids, np.ndarray):

fastdeploy/entrypoints/openai/serving_embedding.py

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,17 @@
1515
"""
1616

1717
import base64
18-
import time
1918
from collections.abc import AsyncGenerator
2019
from typing import Literal, Union
2120

2221
import numpy as np
2322
from typing_extensions import assert_never, override
2423

25-
import fastdeploy.envs as envs
2624
from fastdeploy.engine.pooling_params import PoolingParams
2725
from fastdeploy.engine.request import (
2826
EmbeddingOutput,
2927
EmbeddingRequestOutput,
3028
PoolingRequestOutput,
31-
Request,
3229
)
3330
from fastdeploy.entrypoints.openai.protocol import (
3431
EmbeddingCompletionRequest,
@@ -69,25 +66,13 @@ def __init__(self, engine_client, models, cfg, pid, ips, max_waiting_time, chat_
6966
@override
7067
def _request_to_dict(self, ctx: ServeContext):
7168
request: EmbeddingRequest = ctx.request
72-
if not envs.ENABLE_V1_DATA_PROCESSOR:
73-
request_dict = super()._request_to_dict(ctx)
74-
if hasattr(request, "to_pooling_params"):
75-
pooling_params: PoolingParams = request.to_pooling_params()
76-
pooling_params.verify("embed", self.cfg.model_config)
77-
request_dict["pooling_params"] = pooling_params.to_dict()
78-
request_dict["metrics"] = {}
79-
return request_dict
80-
else:
81-
request_obj = None
82-
if hasattr(request, "to_pooling_params"):
83-
pooling_params: PoolingParams = request.to_pooling_params()
84-
pooling_params.verify("embed", self.cfg.model_config)
85-
request_obj = Request.from_generic_request(
86-
req=request, request_id=ctx.request_id, pooling_params=pooling_params
87-
)
88-
request_obj.metrics.arrival_time = time.time()
89-
super()._process_chat_template_kwargs(request_obj)
90-
return request_obj
69+
request_dict = super()._request_to_dict(ctx)
70+
if hasattr(request, "to_pooling_params"):
71+
pooling_params: PoolingParams = request.to_pooling_params()
72+
pooling_params.verify("embed", self.cfg.model_config)
73+
request_dict["pooling_params"] = pooling_params.to_dict()
74+
request_dict["metrics"] = {}
75+
return request_dict
9176

9277
@override
9378
def _request_to_batch_dicts(self, ctx: ServeContext):

fastdeploy/entrypoints/openai/serving_reward.py

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@
1414
# limitations under the License.
1515
"""
1616

17-
import time
1817
from collections.abc import AsyncGenerator
1918

2019
from typing_extensions import override
2120

22-
import fastdeploy.envs as envs
2321
from fastdeploy.engine.pooling_params import PoolingParams
24-
from fastdeploy.engine.request import PoolingRequestOutput, Request, RewardRequestOutput
22+
from fastdeploy.engine.request import PoolingRequestOutput, RewardRequestOutput
2523
from fastdeploy.entrypoints.openai.protocol import (
2624
ChatRewardData,
2725
ChatRewardRequest,
@@ -46,25 +44,13 @@ def __init__(self, engine_client, models, cfg, pid, ips, max_waiting_time, chat_
4644
@override
4745
def _request_to_dict(self, ctx: ServeContext):
4846
request: ChatRewardRequest = ctx.request
49-
if not envs.ENABLE_V1_DATA_PROCESSOR:
50-
request_dict = super()._request_to_dict(ctx)
51-
if hasattr(request, "to_pooling_params"):
52-
pooling_params: PoolingParams = request.to_pooling_params()
53-
pooling_params.verify("reward", self.cfg.model_config)
54-
request_dict["pooling_params"] = pooling_params.to_dict()
55-
request_dict["metrics"] = {}
56-
return request_dict
57-
else:
58-
request_obj: Request = None
59-
if hasattr(request, "to_pooling_params"):
60-
pooling_params: PoolingParams = request.to_pooling_params()
61-
pooling_params.verify("reward", self.cfg.model_config)
62-
request_obj = Request.from_generic_request(
63-
req=request, request_id=ctx.request_id, pooling_params=pooling_params
64-
)
65-
request_obj.metrics.arrival_time = time.time()
66-
super()._process_chat_template_kwargs(request_obj)
67-
return request_obj
47+
request_dict = super()._request_to_dict(ctx)
48+
if hasattr(request, "to_pooling_params"):
49+
pooling_params: PoolingParams = request.to_pooling_params()
50+
pooling_params.verify("reward", self.cfg.model_config)
51+
request_dict["pooling_params"] = pooling_params.to_dict()
52+
request_dict["metrics"] = {}
53+
return request_dict
6854

6955
@override
7056
def _request_to_batch_dicts(self, ctx: ServeContext):

fastdeploy/envs.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,6 @@ def _validate_split_kv_size(value: int) -> int:
9696
"EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
9797
# enable kv cache block scheduler v1 (no need for kv_cache_ratio)
9898
"ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "1")),
99-
# enable data processor v2
100-
"ENABLE_V1_DATA_PROCESSOR": lambda: int(os.getenv("ENABLE_V1_DATA_PROCESSOR", "0")),
10199
# set prealloc block num for decoder
102100
"FD_ENC_DEC_BLOCK_NUM": lambda: int(os.getenv("FD_ENC_DEC_BLOCK_NUM", "2")),
103101
# enbale max prefill of one execute step

fastdeploy/input/preprocess.py

Lines changed: 17 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from fastdeploy.config import ErnieArchitectures, ModelConfig
2020
from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
2121
from fastdeploy.reasoning import ReasoningParserManager
22-
from fastdeploy.utils import envs
2322
from fastdeploy.utils import llm_logger as logger
2423

2524

@@ -82,39 +81,20 @@ def create_processor(self):
8281
except Exception as e:
8382
logger.info(f"Plugin input processor not available ({e}), using built-in processor")
8483
if not self.model_config.enable_mm:
85-
if not envs.ENABLE_V1_DATA_PROCESSOR:
86-
from fastdeploy.input.text_processor import TextProcessor
87-
88-
tokenizer_type = "ernie4_5" if ErnieArchitectures.contains_ernie_arch(architecture) else "auto"
89-
self.processor = TextProcessor(
90-
model_name_or_path=self.model_name_or_path,
91-
tokenizer_type=tokenizer_type,
92-
reasoning_parser_obj=reasoning_parser_obj,
93-
tool_parser_obj=tool_parser_obj,
94-
)
95-
else:
96-
if not ErnieArchitectures.contains_ernie_arch(architecture):
97-
from fastdeploy.input.v1.text_processor import DataProcessor
98-
else:
99-
from fastdeploy.input.v1.ernie4_5_processor import (
100-
Ernie4_5Processor as DataProcessor,
101-
)
102-
103-
self.processor = DataProcessor(
104-
model_name_or_path=self.model_name_or_path,
105-
reasoning_parser_obj=reasoning_parser_obj,
106-
tool_parser_obj=tool_parser_obj,
107-
)
84+
from fastdeploy.input.text_processor import TextProcessor
85+
86+
tokenizer_type = "ernie4_5" if ErnieArchitectures.contains_ernie_arch(architecture) else "auto"
87+
self.processor = TextProcessor(
88+
model_name_or_path=self.model_name_or_path,
89+
tokenizer_type=tokenizer_type,
90+
reasoning_parser_obj=reasoning_parser_obj,
91+
tool_parser_obj=tool_parser_obj,
92+
)
10893
else:
10994
if ErnieArchitectures.contains_ernie_arch(architecture):
110-
if not envs.ENABLE_V1_DATA_PROCESSOR:
111-
from fastdeploy.input.ernie4_5_vl_processor import (
112-
Ernie4_5_VLProcessor,
113-
)
114-
else:
115-
from fastdeploy.input.v1.ernie4_5_vl_processor import (
116-
Ernie4_5_VLProcessor,
117-
)
95+
from fastdeploy.input.ernie4_5_vl_processor import (
96+
Ernie4_5_VLProcessor,
97+
)
11898

11999
self.processor = Ernie4_5_VLProcessor(
120100
model_name_or_path=self.model_name_or_path,
@@ -125,14 +105,9 @@ def create_processor(self):
125105
enable_processor_cache=self.enable_processor_cache,
126106
)
127107
elif "PaddleOCRVL" in architecture:
128-
if not envs.ENABLE_V1_DATA_PROCESSOR:
129-
from fastdeploy.input.paddleocr_vl_processor import (
130-
PaddleOCRVLProcessor,
131-
)
132-
else:
133-
from fastdeploy.input.v1.paddleocr_vl_processor import (
134-
PaddleOCRVLProcessor,
135-
)
108+
from fastdeploy.input.paddleocr_vl_processor import (
109+
PaddleOCRVLProcessor,
110+
)
136111

137112
self.processor = PaddleOCRVLProcessor(
138113
config=self.model_config,
@@ -142,12 +117,7 @@ def create_processor(self):
142117
reasoning_parser_obj=reasoning_parser_obj,
143118
)
144119
elif "Qwen2_5_VL" in architecture:
145-
if not envs.ENABLE_V1_DATA_PROCESSOR:
146-
from fastdeploy.input.qwen_vl_processor import QwenVLProcessor
147-
else:
148-
from fastdeploy.input.v1.qwen_vl_processor import (
149-
QwenVLProcessor,
150-
)
120+
from fastdeploy.input.qwen_vl_processor import QwenVLProcessor
151121

152122
self.processor = QwenVLProcessor(
153123
config=self.model_config,
@@ -158,12 +128,7 @@ def create_processor(self):
158128
enable_processor_cache=self.enable_processor_cache,
159129
)
160130
elif "Qwen3VL" in architecture:
161-
if not envs.ENABLE_V1_DATA_PROCESSOR:
162-
from fastdeploy.input.qwen3_vl_processor import Qwen3VLProcessor
163-
else:
164-
from fastdeploy.input.v1.qwen3_vl_processor import (
165-
Qwen3VLProcessor,
166-
)
131+
from fastdeploy.input.qwen3_vl_processor import Qwen3VLProcessor
167132

168133
self.processor = Qwen3VLProcessor(
169134
config=self.model_config,

fastdeploy/input/v1/__init__.py

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)