Skip to content

[Batch Inference] AttributeError when using bentoml.batch.run_in_spark - 'APIMethod' object has no attribute 'input' #5524

@Arlen-Du

Description

@Arlen-Du

Describe the bug

Overview

When attempting to use bentoml.batch.run_in_spark for batch inference, the job fails with an AttributeError: 'APIMethod' object has no attribute 'input'. The same BentoService works correctly via REST API, indicating the issue is specific to the Spark batch processing backend.

Environment

  • BentoML Version: 1.4.30
  • Python Version: 3.11
  • Spark Version: 3.4.3
  • Operating System: centos7

Expected Result

The run_in_spark function should successfully process the DataFrame and return a new DataFrame with prediction results.

Actual Result

The Spark job fails. The driver logs show AttributeError: 'APIMethod' object has no attribute 'input' originating from bentoml/_internal/batch/spark.py.

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/19 14:40:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+---+----+---+----+                                                             
| id|name|age|addr|
+---+----+---+----+
|  1|   2|  3|   4|
+---+----+---+----+

INFO: Starting production HTTP BentoServer from "demo2024:1.0" listening on http://localhost:45085 (Press CTRL+C to quit)
2025-12-19 14:40:49,493 INFO - 服务 demo2024:1.0} 初始化完成
2025-12-19T14:40:51+0800 [INFO] [entry_service:demo2024:1] Service demo2024 initialized
WARNING: Client is deprecated and will be removed in BentoML 2.0, please use AsyncClient or SyncClient instead.
25/12/19 14:40:51 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/bentoml/_internal/batch/spark.py", line 86, in process
    func_input = inference_api.input.from_arrow(batch)
                 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'APIMethod' object has no attribute 'input'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
25/12/19 14:40:51 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1) (node3.kdp.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/bentoml/_internal/batch/spark.py", line 86, in process
    func_input = inference_api.input.from_arrow(batch)
                 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'APIMethod' object has no attribute 'input'

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:891)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:139)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

25/12/19 14:40:51 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/data/home/bgdata/deploy/dataplatform-algo-worker/app/manager/task_manager.py", line 27, in <module>
    run_task()
  File "/data/home/bgdata/deploy/dataplatform-algo-worker/app/manager/task_manager.py", line 23, in run_task
    result.show()
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/pyspark/sql/dataframe.py", line 901, in show
    print(self._jdf.showString(n, 20, vertical))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.errors.exceptions.captured.PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/data/home/bgdata/miniconda3/envs/py311/lib/python3.11/site-packages/bentoml/_internal/batch/spark.py", line 86, in process
    func_input = inference_api.input.from_arrow(batch)
                 ^^^^^^^^^^^^^^^^^^^
AttributeError: 'APIMethod' object has no attribute 'input'

Code Example

Service Code (service.py): build as bento "demo2024:1.0"

# -*- coding: utf-8 -*-
import os
from http import HTTPStatus
import bentoml
import numpy as np
from bentoml.exceptions import BentoMLException
import logging

from app.utils.log_util import LogUtil

image = (bentoml.images.Image(base_image="python:3.13-slim",python_version="3.13")
         .python_packages("scikit-learn", "pandas","bentoml"))

# 配置日志
logger = LogUtil("demo2024-1.0",log_path="/tmp/logs/algo-worker")

@bentoml.service(
    name="demo2024",
    resources={"cpu": "0.5", "memory": "1GB"},
    traffic={"timeout": 60},
    image=image,
)
class service:
    # Declare the model as a class variable
    bento_model = bentoml.models.BentoModel("demo2024:1.0")
    def __init__(self):
        logger.info("服务 demo2024:1.0} 初始化完成")
        self.model = bentoml.sklearn.load_model(self.bento_model)

    @bentoml.task
    def predict(self, input_data: np.ndarray):
        try:
            logger.info(f"收到预测请求,输入数据为: {input_data.tolist()}")
            predict = self.model.predict(input_data)
            logger.info(f"预测完成,结果为: {predict.tolist()}")
            return predict.tolist()
        except Exception as e:
            logger.error(f"预测模型失败:{e}")
            raise BentoMLException(f"预测模型失败:{e}",error_code=HTTPStatus.BAD_REQUEST)

Task Code:

import bentoml
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

def run_task():
    spark_session = SparkSession.builder.master("local[*]").getOrCreate()

    # 创建测试数据
    df = spark_session.sql("select 1 as id, 2 as name, 3 as age, 4 as addr")
    df.show()

    bento = bentoml.get("demo2024:1.0")

    result = bentoml.batch.run_in_spark(
        bento,
        df=df,
        spark=spark_session,
        api_name="predict",
        output_schema=StructType([
            StructField("pred", IntegerType(), True)
        ])
    )
    result.show()
    spark_session.stop()

if __name__ == '__main__':
    run_task()

To reproduce

No response

Expected behavior

No response

Environment

  • BentoML Version: 1.4.30
  • Python Version: 3.11
  • Spark Version: 3.4.3
  • Operating System: centos7

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions